From 6c7d5645c9e07310e48553b4cbda78db1968be60 Mon Sep 17 00:00:00 2001 From: Cyril Tovena <cyril.tovena@gmail.com> Date: Thu, 9 May 2019 20:32:13 -0400 Subject: [PATCH] implements json stage metrics --- pkg/logentry/counters.go | 71 --------- pkg/logentry/metric/counters.go | 25 +++ pkg/logentry/{ => metric}/counters_test.go | 5 +- pkg/logentry/metric/entries.go | 27 ++++ pkg/logentry/metric/gauges.go | 25 +++ pkg/logentry/metric/histograms.go | 26 ++++ pkg/logentry/metric/metricvec.go | 46 ++++++ pkg/logentry/stages/extensions.go | 2 +- pkg/logentry/stages/json.go | 167 +++++++++++++-------- pkg/logentry/stages/json_test.go | 6 +- pkg/logentry/stages/metrics.go | 103 +++++++++++++ pkg/logentry/stages/stage.go | 8 + pkg/logentry/stages/util_test.go | 4 + 13 files changed, 373 insertions(+), 142 deletions(-) delete mode 100644 pkg/logentry/counters.go create mode 100644 pkg/logentry/metric/counters.go rename pkg/logentry/{ => metric}/counters_test.go (96%) create mode 100644 pkg/logentry/metric/entries.go create mode 100644 pkg/logentry/metric/gauges.go create mode 100644 pkg/logentry/metric/histograms.go create mode 100644 pkg/logentry/metric/metricvec.go create mode 100644 pkg/logentry/stages/metrics.go diff --git a/pkg/logentry/counters.go b/pkg/logentry/counters.go deleted file mode 100644 index 7d15bf06..00000000 --- a/pkg/logentry/counters.go +++ /dev/null @@ -1,71 +0,0 @@ -package logentry - -import ( - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/promtail/api" - "github.com/grafana/loki/pkg/util" -) - -type counters struct { - name, help string - mtx sync.Mutex - counters map[model.Fingerprint]prometheus.Counter -} - -func newCounters(name, help string) *counters { - return &counters{ - counters: map[model.Fingerprint]prometheus.Counter{}, - help: help, - name: name, - } -} - -func (c *counters) Describe(ch chan<- *prometheus.Desc) {} - -func (c *counters) Collect(ch chan<- prometheus.Metric) { - c.mtx.Lock() - defer c.mtx.Unlock() - for _, m := range c.counters { - ch <- m - } -} - -func (c *counters) With(labels model.LabelSet) prometheus.Counter { - c.mtx.Lock() - defer c.mtx.Unlock() - fp := labels.Fingerprint() - var ok bool - var counter prometheus.Counter - if counter, ok = c.counters[fp]; !ok { - counter = prometheus.NewCounter(prometheus.CounterOpts{ - Help: c.help, - Name: c.name, - ConstLabels: util.ModelLabelSetToMap(labels), - }) - c.counters[fp] = counter - } - return counter -} - -func logCount(reg prometheus.Registerer) api.EntryHandler { - c := newCounters("log_entries_total", "the total count of log entries") - reg.MustRegister(c) - return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { - c.With(labels).Inc() - return nil - }) -} - -func logSize(reg prometheus.Registerer) api.EntryHandler { - c := newCounters("log_entries_bytes", "the total count of bytes") - reg.MustRegister(c) - return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { - c.With(labels).Add(float64(len(entry))) - return nil - }) -} diff --git a/pkg/logentry/metric/counters.go b/pkg/logentry/metric/counters.go new file mode 100644 index 00000000..03f5ff46 --- /dev/null +++ b/pkg/logentry/metric/counters.go @@ -0,0 +1,25 @@ +package metric + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +type Counters struct { + *metricVec +} + +func NewCounters(name, help string) *Counters { + return &Counters{ + metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { + return prometheus.NewCounter(prometheus.CounterOpts{ + Help: help, + Name: name, + ConstLabels: labels, + }) + })} +} + +func (c *Counters) With(labels model.LabelSet) prometheus.Counter { + return c.metricVec.With(labels).(prometheus.Counter) +} diff --git a/pkg/logentry/counters_test.go b/pkg/logentry/metric/counters_test.go similarity index 96% rename from pkg/logentry/counters_test.go rename to pkg/logentry/metric/counters_test.go index 59746032..7bb4c4b9 100644 --- a/pkg/logentry/counters_test.go +++ b/pkg/logentry/metric/counters_test.go @@ -1,4 +1,4 @@ -package logentry +package metric import ( "strings" @@ -22,9 +22,8 @@ log_entries_total{bar="foo",foo="bar"} 5.0 func Test_logCount(t *testing.T) { t.Parallel() - reg := prometheus.NewRegistry() - handler := logCount(reg) + handler := LogCount(reg) workerCount := 5 var wg sync.WaitGroup diff --git a/pkg/logentry/metric/entries.go b/pkg/logentry/metric/entries.go new file mode 100644 index 00000000..9d656ff2 --- /dev/null +++ b/pkg/logentry/metric/entries.go @@ -0,0 +1,27 @@ +package metric + +import ( + "time" + + "github.com/grafana/loki/pkg/promtail/api" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +func LogCount(reg prometheus.Registerer) api.EntryHandler { + c := NewCounters("log_entries_total", "the total count of log entries") + reg.MustRegister(c) + return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { + c.With(labels).Inc() + return nil + }) +} + +func LogSize(reg prometheus.Registerer) api.EntryHandler { + c := NewCounters("log_entries_bytes", "the total count of bytes") + reg.MustRegister(c) + return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { + c.With(labels).Add(float64(len(entry))) + return nil + }) +} diff --git a/pkg/logentry/metric/gauges.go b/pkg/logentry/metric/gauges.go new file mode 100644 index 00000000..24a4e16b --- /dev/null +++ b/pkg/logentry/metric/gauges.go @@ -0,0 +1,25 @@ +package metric + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +type Gauges struct { + *metricVec +} + +func NewGauges(name, help string) *Gauges { + return &Gauges{ + metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { + return prometheus.NewGauge(prometheus.GaugeOpts{ + Help: help, + Name: name, + ConstLabels: labels, + }) + })} +} + +func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge { + return g.metricVec.With(labels).(prometheus.Gauge) +} diff --git a/pkg/logentry/metric/histograms.go b/pkg/logentry/metric/histograms.go new file mode 100644 index 00000000..9bfc2560 --- /dev/null +++ b/pkg/logentry/metric/histograms.go @@ -0,0 +1,26 @@ +package metric + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +type Histograms struct { + *metricVec +} + +func NewHistograms(name, help string, buckets []float64) *Histograms { + return &Histograms{ + metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { + return prometheus.NewHistogram(prometheus.HistogramOpts{ + Help: help, + Name: name, + ConstLabels: labels, + Buckets: buckets, + }) + })} +} + +func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram { + return h.metricVec.With(labels).(prometheus.Histogram) +} diff --git a/pkg/logentry/metric/metricvec.go b/pkg/logentry/metric/metricvec.go new file mode 100644 index 00000000..39dc1812 --- /dev/null +++ b/pkg/logentry/metric/metricvec.go @@ -0,0 +1,46 @@ +package metric + +import ( + "sync" + + "github.com/grafana/loki/pkg/util" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +type metricVec struct { + factory func(labels map[string]string) prometheus.Metric + mtx sync.Mutex + metrics map[model.Fingerprint]prometheus.Metric +} + +func newMetricVec(factory func(labels map[string]string) prometheus.Metric) *metricVec { + return &metricVec{ + metrics: map[model.Fingerprint]prometheus.Metric{}, + factory: factory, + } +} + +func (c *metricVec) Describe(ch chan<- *prometheus.Desc) {} + +func (c *metricVec) Collect(ch chan<- prometheus.Metric) { + c.mtx.Lock() + defer c.mtx.Unlock() + for _, m := range c.metrics { + ch <- m + } +} + +func (c *metricVec) With(labels model.LabelSet) prometheus.Metric { + c.mtx.Lock() + defer c.mtx.Unlock() + fp := labels.Fingerprint() + var ok bool + var metric prometheus.Metric + if metric, ok = c.metrics[fp]; !ok { + metric = c.factory(util.ModelLabelSetToMap(labels)) + c.metrics[fp] = metric + } + return metric +} diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index a99c2be5..338eb4ab 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -20,7 +20,7 @@ func NewDocker(logger log.Logger) (Stage, error) { "source": "log", }, } - return NewJSON(logger, config) + return NewJSON(logger, config, nil) } // NewCRI creates a CRI format specific pipeline stage diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index 17e325e2..eb1603ea 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -10,6 +10,7 @@ import ( "github.com/jmespath/go-jmespath" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) @@ -34,6 +35,7 @@ type JSONConfig struct { Timestamp *JSONTimestamp `mapstructure:"timestamp"` Output *JSONOutput `mapstructure:"output"` Labels map[string]*JSONLabel `mapstructure:"labels"` + Metrics MetricsConfig `mapstructure:"metrics"` } func newJSONConfig(config interface{}) (*JSONConfig, error) { @@ -90,6 +92,16 @@ func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) { } } } + + // metrics expressions. + for _, mcfg := range c.Metrics { + if mcfg.Source != nil { + expressions[*mcfg.Source], err = jmespath.Compile(*mcfg.Source) + if err != nil { + return nil, errors.Wrap(err, "could not compile output source jmespath expression") + } + } + } return expressions, nil } @@ -100,7 +112,7 @@ type jsonStage struct { } // NewJSON creates a new json stage from a config. -func NewJSON(logger log.Logger, config interface{}) (Stage, error) { +func NewJSON(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { cfg, err := newJSONConfig(config) if err != nil { return nil, err @@ -109,67 +121,32 @@ func NewJSON(logger log.Logger, config interface{}) (Stage, error) { if err != nil { return nil, err } - return &jsonStage{ + return withMetric(&jsonStage{ cfg: cfg, expressions: expressions, logger: log.With(logger, "component", "parser", "type", "json"), - }, nil -} - -func (j *jsonStage) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) { - if expr == nil { - result, ok = data[fallback].(string) - if !ok { - level.Debug(j.logger).Log("msg", "field is not a string", "field", fallback) - } - } else { - var searchResult interface{} - searchResult, ok = j.getJSONValue(expr, data) - if !ok { - level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) - return - } - result, ok = searchResult.(string) - if !ok { - level.Debug(j.logger).Log("msg", "search result is not a string", "expr", *expr) - } - } - return -} - -func (j *jsonStage) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) { - var err error - ok = true - result, err = j.expressions[*expr].Search(data) - if err != nil { - level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) - ok = false - return - } - return + }, cfg.Metrics, registerer), nil } // Process implement a pipeline stage -func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) { +func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) Valuer { if entry == nil { level.Debug(j.logger).Log("msg", "cannot parse a nil entry") - return + return nil } - var data map[string]interface{} - if err := json.Unmarshal([]byte(*entry), &data); err != nil { - level.Debug(j.logger).Log("msg", "could not unmarshal json", "err", err) - return + valuer, err := newJSONValuer(*entry, j.expressions) + if err != nil { + level.Debug(j.logger).Log("msg", "failed to create json valuer", "err", err) + return nil } // parsing ts if j.cfg.Timestamp != nil { - if ts, ok := j.getJSONString(j.cfg.Timestamp.Source, "timestamp", data); ok { - parsedTs, err := time.Parse(j.cfg.Timestamp.Format, ts) - if err != nil { - level.Debug(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts) - } else { - *t = parsedTs - } + ts, err := parseTimestamp(valuer, j.cfg.Timestamp) + if err != nil { + level.Debug(j.logger).Log("msg", "failed to parse timestamp", "err", err) + } else { + *t = ts } } @@ -179,8 +156,9 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) if lSrc != nil { src = lSrc.Source } - lValue, ok := j.getJSONString(src, lName, data) - if !ok { + lValue, err := valuer.getJSONString(src, lName) + if err != nil { + level.Debug(j.logger).Log("msg", "failed to get json string", "err", err) continue } labelValue := model.LabelValue(lValue) @@ -194,17 +172,82 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) // parsing output if j.cfg.Output != nil { - if jsonObj, ok := j.getJSONValue(j.cfg.Output.Source, data); ok && jsonObj != nil { - if s, ok := jsonObj.(string); ok { - *entry = s - return - } - b, err := json.Marshal(jsonObj) - if err != nil { - level.Debug(j.logger).Log("msg", "could not marshal output value", "err", err) - return - } - *entry = string(b) + output, err := parseOutput(valuer, j.cfg.Output) + if err != nil { + level.Debug(j.logger).Log("msg", "failed to parse output", "err", err) + } else { + *entry = output } } + return valuer +} + +type jsonValuer struct { + exprmap map[string]*jmespath.JMESPath + data map[string]interface{} +} + +func newJSONValuer(entry string, exprmap map[string]*jmespath.JMESPath) (*jsonValuer, error) { + var data map[string]interface{} + if err := json.Unmarshal([]byte(entry), &data); err != nil { + return nil, err + } + return &jsonValuer{ + data: data, + exprmap: exprmap, + }, nil +} + +func (v *jsonValuer) Value(expr *string) (interface{}, error) { + return v.exprmap[*expr].Search(v.data) +} + +func (v *jsonValuer) getJSONString(expr *string, fallback string) (result string, err error) { + var ok bool + if expr == nil { + result, ok = v.data[fallback].(string) + if !ok { + return result, fmt.Errorf("%s is not a string but %T", fallback, v.data[fallback]) + } + return + } + var searchResult interface{} + if searchResult, err = v.Value(expr); err != nil { + return + } + if result, ok = searchResult.(string); !ok { + return result, fmt.Errorf("%s is not a string but %T", *expr, searchResult) + } + + return +} + +func parseOutput(v Valuer, cfg *JSONOutput) (string, error) { + jsonObj, err := v.Value(cfg.Source) + if err != nil { + return "", errors.Wrap(err, "failed to fetch json value") + } + if jsonObj == nil { + return "", errors.New("json value is nil") + } + if s, ok := jsonObj.(string); ok { + return s, nil + } + b, err := json.Marshal(jsonObj) + if err != nil { + return "", errors.Wrap(err, "could not marshal output value") + } + return string(b), nil +} + +func parseTimestamp(v *jsonValuer, cfg *JSONTimestamp) (time.Time, error) { + ts, err := v.getJSONString(cfg.Source, "timestamp") + if err != nil { + return time.Time{}, err + } + parsedTs, err := time.Parse(cfg.Format, ts) + if err != nil { + return time.Time{}, errors.Wrapf(err, "failed to parse time format:%s value:%s", cfg.Format, ts) + } + return parsedTs, nil } diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index 1d797dbc..ee4aa9d3 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -54,10 +54,6 @@ func TestYamlMapStructure(t *testing.T) { } } -func String(s string) *string { - return &s -} - func TestJSONConfig_validate(t *testing.T) { t.Parallel() tests := map[string]struct { @@ -316,7 +312,7 @@ func TestJSONParser_Parse(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewJSON(util.Logger, tt.config) + p, err := NewJSON(util.Logger, tt.config, nil) if err != nil { t.Fatalf("failed to create json parser: %s", err) } diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go new file mode 100644 index 00000000..7e996930 --- /dev/null +++ b/pkg/logentry/stages/metrics.go @@ -0,0 +1,103 @@ +package stages + +import ( + "strings" + "time" + + "github.com/grafana/loki/pkg/logentry/metric" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/common/model" +) + +const ( + MetricTypeCounter = "counter" + MetricTypeGauge = "gauge" + MetricTypeHistogram = "histogram" +) + +type MetricConfig struct { + MetricType string `mapstructure:"type"` + Description string `mapstructure:"description"` + Source *string `mapstructure:"source"` + Buckets []float64 `mapstructure:"buckets"` +} + +type MetricsConfig map[string]MetricConfig + +type Valuer interface { + Value(source *string) (interface{}, error) +} + +type StageValuer interface { + Process(labels model.LabelSet, time *time.Time, entry *string) Valuer +} + +func withMetric(s StageValuer, cfg MetricsConfig, registry prometheus.Registerer) Stage { + if registry == nil { + return StageFunc(func(labels model.LabelSet, time *time.Time, entry *string) { + _ = s.Process(labels, time, entry) + }) + } + metricStage := newMetric(cfg, registry) + return StageFunc(func(labels model.LabelSet, time *time.Time, entry *string) { + valuer := s.Process(labels, time, entry) + if valuer != nil { + metricStage.process(valuer, labels) + } + }) +} + +func newMetric(cfgs MetricsConfig, registry prometheus.Registerer) *metricStage { + metrics := map[string]prometheus.Collector{} + for name, config := range cfgs { + var collector prometheus.Collector + + switch strings.ToLower(config.MetricType) { + case MetricTypeCounter: + collector = metric.NewCounters(name, config.Description) + case MetricTypeGauge: + collector = metric.NewGauges(name, config.Description) + case MetricTypeHistogram: + collector = metric.NewHistograms(name, config.Description, config.Buckets) + } + if collector != nil { + registry.MustRegister(collector) + metrics[name] = collector + } + } + return &metricStage{ + cfg: cfgs, + metrics: metrics, + } +} + +type metricStage struct { + cfg MetricsConfig + metrics map[string]prometheus.Collector +} + +func (m *metricStage) process(v Valuer, labels model.LabelSet) { + for name, collector := range m.metrics { + switch vec := collector.(type) { + case metric.Counters: + recordCounter(vec.With(labels), v, m.cfg[name]) + case metric.Gauges: + recordGauge(vec.With(labels), v, m.cfg[name]) + case metric.Histograms: + recordHistogram(vec.With(labels), v, m.cfg[name]) + } + } +} + +func recordCounter(counter prometheus.Counter, v Valuer, cfg MetricConfig) { + +} + +func recordGauge(counter prometheus.Gauge, v Valuer, cfg MetricConfig) { + +} + +func recordHistogram(counter prometheus.Histogram, v Valuer, cfg MetricConfig) { + +} diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index f9c319d7..82f5ea90 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -11,3 +11,11 @@ import ( type Stage interface { Process(labels model.LabelSet, time *time.Time, entry *string) } + +// StageFunc is modelled on http.HandlerFunc. +type StageFunc func(labels model.LabelSet, time *time.Time, entry *string) + +// Process implements EntryHandler. +func (s StageFunc) Process(labels model.LabelSet, time *time.Time, entry *string) { + s(labels, time, entry) +} diff --git a/pkg/logentry/stages/util_test.go b/pkg/logentry/stages/util_test.go index 26bd2f76..842249a4 100644 --- a/pkg/logentry/stages/util_test.go +++ b/pkg/logentry/stages/util_test.go @@ -37,3 +37,7 @@ func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) { assert.Equal(t, model.LabelValue(v), gotV, "mismatch label value") } } + +func String(s string) *string { + return &s +} -- GitLab