diff --git a/pkg/logentry/metric/counters_test.go b/pkg/logentry/metric/counters_test.go deleted file mode 100644 index 7bb4c4b98f606ff05d47669a24fe1b1cec406b90..0000000000000000000000000000000000000000 --- a/pkg/logentry/metric/counters_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package metric - -import ( - "strings" - "sync" - "testing" - "time" - - "github.com/prometheus/client_golang/prometheus" - - testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/common/model" -) - -var expected = `# HELP log_entries_total the total count of log entries -# TYPE log_entries_total counter -log_entries_total 10.0 -log_entries_total{foo="bar"} 5.0 -log_entries_total{bar="foo"} 5.0 -log_entries_total{bar="foo",foo="bar"} 5.0 -` - -func Test_logCount(t *testing.T) { - t.Parallel() - reg := prometheus.NewRegistry() - handler := LogCount(reg) - - workerCount := 5 - var wg sync.WaitGroup - for i := 0; i < workerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "") - _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar"}), time.Now(), "") - _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "") - _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "") - _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "") - - }() - } - wg.Wait() - - if err := testutil.GatherAndCompare(reg, strings.NewReader(expected), "log_entries_total"); err != nil { - t.Fatalf("missmatch metrics: %v", err) - } - -} diff --git a/pkg/logentry/metric/entries_test.go b/pkg/logentry/metric/entries_test.go new file mode 100644 index 0000000000000000000000000000000000000000..41dde2989fcc72f0fa940aa499b954d72db086bc --- /dev/null +++ b/pkg/logentry/metric/entries_test.go @@ -0,0 +1,83 @@ +package metric + +import ( + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + + testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" +) + +const expectedCount = `# HELP log_entries_total the total count of log entries +# TYPE log_entries_total counter +log_entries_total 10.0 +log_entries_total{foo="bar"} 5.0 +log_entries_total{bar="foo"} 5.0 +log_entries_total{bar="foo",foo="bar"} 5.0 +` + +func Test_LogCount(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + handler := LogCount(reg) + + workerCount := 5 + var wg sync.WaitGroup + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar"}), time.Now(), "") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "") + + }() + } + wg.Wait() + + if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedCount), "log_entries_total"); err != nil { + t.Fatalf("missmatch metrics: %v", err) + } + +} + +const expectedSize = `# HELP log_entries_bytes the total count of bytes +# TYPE log_entries_bytes counter +log_entries_bytes 35.0 +log_entries_bytes{foo="bar"} 15.0 +log_entries_bytes{bar="foo"} 10.0 +log_entries_bytes{bar="foo",foo="bar"} 15.0 +` + +func Test_LogSize(t *testing.T) { + t.Parallel() + reg := prometheus.NewRegistry() + handler := LogSize(reg) + + workerCount := 5 + var wg sync.WaitGroup + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "foo") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar"}), time.Now(), "bar") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "fu") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "baz") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "more") + + }() + } + wg.Wait() + + if err := testutil.GatherAndCompare(reg, strings.NewReader(expectedSize), "log_entries_bytes"); err != nil { + t.Fatalf("missmatch metrics: %v", err) + } + +} diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index eb1603ea3537653e7b35776f20d4ba830965ed2c..1857d3ea346796c9d0a40d9c7dcf70ec969fca11 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -49,7 +49,7 @@ func newJSONConfig(config interface{}) (*JSONConfig, error) { // validate the config and returns a map of necessary jmespath expressions. func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) { - if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil { + if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil && len(c.Metrics) == 0 { return nil, errors.New("empty json parser configuration") } diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index ee4aa9d35fe574055e561154d1169e07622134aa..385f736cefa8efd42af93cc7a7ce88d6aaf147f2 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -189,6 +189,11 @@ var logFixture = ` "app":"loki", "component": ["parser","type"], "level" : "WARN", + "numeric": { + "float": 12.34, + "integer": 123, + "string": "123" + }, "nested" : {"child":"value"}, "message" : "this is a log line", "complex" : { diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index 7e9969308a6ce4cba23eae39a214a627b7e50e7f..0344f50edd47d06af9a4842909f5380904711e1f 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -1,6 +1,9 @@ package stages import ( + "fmt" + "math" + "strconv" "strings" "time" @@ -80,24 +83,79 @@ type metricStage struct { func (m *metricStage) process(v Valuer, labels model.LabelSet) { for name, collector := range m.metrics { switch vec := collector.(type) { - case metric.Counters: + case *metric.Counters: recordCounter(vec.With(labels), v, m.cfg[name]) - case metric.Gauges: + case *metric.Gauges: recordGauge(vec.With(labels), v, m.cfg[name]) - case metric.Histograms: + case *metric.Histograms: recordHistogram(vec.With(labels), v, m.cfg[name]) } } } func recordCounter(counter prometheus.Counter, v Valuer, cfg MetricConfig) { - + unk, err := v.Value(cfg.Source) + if err != nil { + return + } + f, err := getFloat(unk) + if err != nil || f < 0 { + return + } + counter.Add(f) } -func recordGauge(counter prometheus.Gauge, v Valuer, cfg MetricConfig) { - +func recordGauge(gauge prometheus.Gauge, v Valuer, cfg MetricConfig) { + unk, err := v.Value(cfg.Source) + if err != nil { + return + } + f, err := getFloat(unk) + if err != nil { + return + } + gauge.Add(f) } -func recordHistogram(counter prometheus.Histogram, v Valuer, cfg MetricConfig) { +func recordHistogram(histogram prometheus.Histogram, v Valuer, cfg MetricConfig) { + unk, err := v.Value(cfg.Source) + if err != nil { + return + } + f, err := getFloat(unk) + if err != nil { + return + } + histogram.Observe(f) +} +func getFloat(unk interface{}) (float64, error) { + + switch i := unk.(type) { + case float64: + return i, nil + case float32: + return float64(i), nil + case int64: + return float64(i), nil + case int32: + return float64(i), nil + case int: + return float64(i), nil + case uint64: + return float64(i), nil + case uint32: + return float64(i), nil + case uint: + return float64(i), nil + case string: + return strconv.ParseFloat(i, 64) + case bool: + if i { + return float64(1), nil + } + return float64(0), nil + default: + return math.NaN(), fmt.Errorf("Can't convert %v to float64", unk) + } } diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b7a624ebbc4ec47ea6344415e910d1e682635e6c --- /dev/null +++ b/pkg/logentry/stages/metrics_test.go @@ -0,0 +1,133 @@ +package stages + +import ( + "strings" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" + testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" +) + +var labelFoo = model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar", "bar": "foo"}) +var labelFu = model.LabelSet(map[model.LabelName]model.LabelValue{"fu": "baz", "baz": "fu"}) + +func Test_withMetric(t *testing.T) { + cfg := map[string]interface{}{ + "metrics": map[string]interface{}{ + "total_keys": map[string]interface{}{ + "type": "Counter", + "source": "length(keys(@))", + "description": "the total keys per doc", + }, + "keys_per_line": map[string]interface{}{ + "type": "Histogram", + "source": "length(keys(@))", + "description": "keys per doc", + "buckets": []float64{1, 3, 5, 10}, + }, + "numeric_float": map[string]interface{}{ + "type": "Gauge", + "source": "numeric.float", + "description": "numeric_float", + }, + "numeric_integer": map[string]interface{}{ + "type": "Gauge", + "source": "numeric.integer", + "description": "numeric.integer", + }, + "numeric_string": map[string]interface{}{ + "type": "Gauge", + "source": "numeric.string", + "description": "numeric.string", + }, + "contains_warn": map[string]interface{}{ + "type": "Counter", + "source": "contains(values(@),'WARN')", + "description": "contains_warn", + }, + "contains_false": map[string]interface{}{ + "type": "Counter", + "source": "contains(keys(@),'nope')", + "description": "contains_false", + }, + "unconvertible": map[string]interface{}{ + "type": "Counter", + "source": "values(@)", + "description": "unconvertible", + }, + }, + } + registry := prometheus.NewRegistry() + metricStage, err := NewJSON(util.Logger, cfg, registry) + if err != nil { + t.Fatalf("failed to create stage with metrics: %v", err) + } + var ts = time.Now() + var entry = logFixture + + metricStage.Process(labelFoo, &ts, &entry) + metricStage.Process(labelFu, &ts, &entry) + + if err := testutil.GatherAndCompare(registry, + strings.NewReader(goldenMetrics), metricNames(cfg)...); err != nil { + t.Fatalf("missmatch metrics: %v", err) + } +} + +func metricNames(cfg map[string]interface{}) []string { + metrics := cfg["metrics"].(map[string]interface{}) + result := make([]string, 0, len(cfg)) + for name := range metrics { + result = append(result, name) + } + return result +} + +const goldenMetrics = `# HELP contains_false contains_false +# TYPE contains_false counter +contains_false{bar="foo",foo="bar"} 0.0 +contains_false{baz="fu",fu="baz"} 0.0 +# HELP contains_warn contains_warn +# TYPE contains_warn counter +contains_warn{bar="foo",foo="bar"} 1.0 +contains_warn{baz="fu",fu="baz"} 1.0 +# HELP keys_per_line keys per doc +# TYPE keys_per_line histogram +keys_per_line_bucket{bar="foo",foo="bar",le="1.0"} 0.0 +keys_per_line_bucket{bar="foo",foo="bar",le="3.0"} 0.0 +keys_per_line_bucket{bar="foo",foo="bar",le="5.0"} 0.0 +keys_per_line_bucket{bar="foo",foo="bar",le="10.0"} 1.0 +keys_per_line_bucket{bar="foo",foo="bar",le="+Inf"} 1.0 +keys_per_line_sum{bar="foo",foo="bar"} 8.0 +keys_per_line_count{bar="foo",foo="bar"} 1.0 +keys_per_line_bucket{baz="fu",fu="baz",le="1.0"} 0.0 +keys_per_line_bucket{baz="fu",fu="baz",le="3.0"} 0.0 +keys_per_line_bucket{baz="fu",fu="baz",le="5.0"} 0.0 +keys_per_line_bucket{baz="fu",fu="baz",le="10.0"} 1.0 +keys_per_line_bucket{baz="fu",fu="baz",le="+Inf"} 1.0 +keys_per_line_sum{baz="fu",fu="baz"} 8.0 +keys_per_line_count{baz="fu",fu="baz"} 1.0 +# HELP numeric_float numeric_float +# TYPE numeric_float gauge +numeric_float{bar="foo",foo="bar"} 12.34 +numeric_float{baz="fu",fu="baz"} 12.34 +# HELP numeric_integer numeric.integer +# TYPE numeric_integer gauge +numeric_integer{bar="foo",foo="bar"} 123.0 +numeric_integer{baz="fu",fu="baz"} 123.0 +# HELP numeric_string numeric.string +# TYPE numeric_string gauge +numeric_string{bar="foo",foo="bar"} 123.0 +numeric_string{baz="fu",fu="baz"} 123.0 +# HELP total_keys the total keys per doc +# TYPE total_keys counter +total_keys{bar="foo",foo="bar"} 8.0 +total_keys{baz="fu",fu="baz"} 8.0 +# HELP unconvertible unconvertible +# TYPE unconvertible counter +unconvertible{bar="foo",foo="bar"} 0.0 +unconvertible{baz="fu",fu="baz"} 0.0 +`