diff --git a/pkg/logentry/stages/configs.go b/pkg/logentry/stages/configs.go deleted file mode 100644 index 1b4b85a23cda33ad4e1d5f4b83387de2df9ed921..0000000000000000000000000000000000000000 --- a/pkg/logentry/stages/configs.go +++ /dev/null @@ -1,7 +0,0 @@ -package stages - -const ( - MetricTypeCounter = "counter" - MetricTypeGauge = "gauge" - MetricTypeHistogram = "histogram" -) diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index 27fb392e82f25ecb7112f67e81d849cf025e9b7a..9f2030c414bf6c4f0e4a9a2bff206514c653916a 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -8,7 +8,7 @@ import ( const RFC3339Nano = "RFC3339Nano" // NewDocker creates a Docker json log format specific pipeline stage. -func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) { +func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { stages := PipelineStages{ PipelineStage{ StageTypeJSON: JSONConfig{ @@ -32,12 +32,11 @@ func NewDocker(logger log.Logger, jobName string, registerer prometheus.Register "output", }, }} - - return NewPipeline(logger, stages, jobName+"_docker", registerer) + return NewPipeline(logger, stages, nil, registerer) } // NewCRI creates a CRI format specific pipeline stage -func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) { +func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) { stages := PipelineStages{ PipelineStage{ StageTypeRegex: RegexConfig{ @@ -61,5 +60,5 @@ func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) }, }, } - return NewPipeline(logger, stages, jobName+"_cri", registerer) + return NewPipeline(logger, stages, nil, registerer) } diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go index 9f44d0a8f65efba0ed53cc70c422db192ab3a637..f544614cf90106df40de9347a3afaaa837893477 100644 --- a/pkg/logentry/stages/extensions_test.go +++ b/pkg/logentry/stages/extensions_test.go @@ -65,7 +65,7 @@ func TestNewDocker(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer) + p, err := NewDocker(util.Logger, prometheus.DefaultRegisterer) if err != nil { t.Fatalf("failed to create Docker parser: %s", err) } @@ -141,7 +141,7 @@ func TestNewCri(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer) + p, err := NewCRI(util.Logger, prometheus.DefaultRegisterer) if err != nil { t.Fatalf("failed to create CRI parser: %s", err) } diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index a7df01505d2e01909475dc4e5bf99aa2a0bb5cc5..2458d7440843df9002f4ebddde76493f5f46fb57 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -21,6 +21,7 @@ const ( ErrEmptyJSONStageConfig = "empty json stage configuration" ) +// JSONConfig represents a JSON Stage configuration type JSONConfig struct { Expressions map[string]string `mapstructure:"expressions"` } @@ -52,14 +53,14 @@ func validateJSONConfig(c *JSONConfig) (map[string]*jmespath.JMESPath, error) { return expressions, nil } -// jsonStage extracts log data via json parsing. +// jsonStage sets extracted data using JMESPath expressions type jsonStage struct { cfg *JSONConfig expressions map[string]*jmespath.JMESPath logger log.Logger } -// newJSONStage creates a new json mutator from a config. +// newJSONStage creates a new json pipeline stage from a config. func newJSONStage(logger log.Logger, config interface{}) (*jsonStage, error) { cfg, err := parseJSONConfig(config) if err != nil { @@ -85,7 +86,7 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) { return cfg, nil } -// Process implements Mutator +// Process implements Stage func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { if entry == nil { level.Debug(j.logger).Log("msg", "cannot parse a nil entry") diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index ab8173fdbd8db7f0be5ffeebc4d194c798f15bd6..772a506c191ce8f4ac1e9e03b3a8c442d6b24bd0 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -174,7 +174,7 @@ func TestJSONParser_Parse(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := New(util.Logger, "test", StageTypeJSON, tt.config, nil) + p, err := New(util.Logger, nil, StageTypeJSON, tt.config, nil) if err != nil { t.Fatalf("failed to create json parser: %s", err) } diff --git a/pkg/logentry/stages/labels.go b/pkg/logentry/stages/labels.go index 504d8c1980666642a7302cdd292a3e21480344ff..d5f75ba53371e312632f19b365049451b398075c 100644 --- a/pkg/logentry/stages/labels.go +++ b/pkg/logentry/stages/labels.go @@ -20,6 +20,7 @@ const ( // LabelsConfig is a set of labels to be extracted type LabelsConfig map[string]*string +// validateLabelsConfig validates the Label stage configuration func validateLabelsConfig(c LabelsConfig) error { if c == nil { return errors.New(ErrEmptyLabelStageConfig) @@ -37,8 +38,8 @@ func validateLabelsConfig(c LabelsConfig) error { return nil } -// newLabel creates a new label stage to set labels from extracted data -func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) { +// newLabelStage creates a new label stage to set labels from extracted data +func newLabelStage(logger log.Logger, configs interface{}) (*labelStage, error) { cfgs := &LabelsConfig{} err := mapstructure.Decode(configs, cfgs) if err != nil { @@ -54,11 +55,13 @@ func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) { }, nil } +// labelStage sets labels from extracted data type labelStage struct { cfgs LabelsConfig logger log.Logger } +// Process implements Stage func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for lName, lSrc := range l.cfgs { if _, ok := extracted[*lSrc]; ok { @@ -66,6 +69,7 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa s, err := getString(lValue) if err != nil { level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue).String()) + continue } labelValue := model.LabelValue(s) if !labelValue.IsValid() { diff --git a/pkg/logentry/stages/labels_test.go b/pkg/logentry/stages/labels_test.go index 75a4b1cc70e07a03b3917557a306bfabcb971326..af594d73bde67894a3393e12bfb8dbbc93413f20 100644 --- a/pkg/logentry/stages/labels_test.go +++ b/pkg/logentry/stages/labels_test.go @@ -33,7 +33,7 @@ var testLabelsLogLine = ` ` func TestLabelsPipeline_Labels(t *testing.T) { - pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), "test", prometheus.DefaultRegisterer) + pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer) if err != nil { t.Fatal(err) } @@ -108,4 +108,49 @@ func TestLabels(t *testing.T) { } } -//TODO test label processing +func TestLabelStage_Process(t *testing.T) { + sourceName := "diff_source" + tests := map[string]struct { + config LabelsConfig + extractedData map[string]interface{} + inputLabels model.LabelSet + expectedLabels model.LabelSet + }{ + "extract_success": { + LabelsConfig{ + "testLabel": nil, + }, + map[string]interface{}{ + "testLabel": "testValue", + }, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + "different_source_name": { + LabelsConfig{ + "testLabel": &sourceName, + }, + map[string]interface{}{ + sourceName: "testValue", + }, + model.LabelSet{}, + model.LabelSet{ + "testLabel": "testValue", + }, + }, + } + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + st, err := newLabelStage(util.Logger, test.config) + if err != nil { + t.Fatal(err) + } + st.Process(test.inputLabels, test.extractedData, nil, nil) + assert.Equal(t, test.expectedLabels, test.expectedLabels) + }) + } +} diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index 493875bf1e6175748543d9ba445d0dffaaa27dfc..59895264138dfa3e1efbd1454e1da70320656533 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -15,23 +15,25 @@ import ( const ( ErrEmptyMatchStageConfig = "match stage config cannot be empty" - ErrPipelineNameRequired = "match stage must specify a pipeline name which is used in the exported metrics" + ErrPipelineNameRequired = "match stage pipeline name can be omitted but cannot be an empty string" ErrSelectorRequired = "selector statement required for match stage" ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'" ErrSelectorSyntax = "invalid selector syntax for match stage" ) +// MatcherConfig contains the configuration for a matcherStage type MatcherConfig struct { - PipelineName string `mapstructure:"pipeline_name"` + PipelineName *string `mapstructure:"pipeline_name"` Selector string `mapstructure:"selector"` Stages PipelineStages `mapstructure:"stages"` } +// validateMatcherConfig validates the MatcherConfig for the matcherStage func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { if cfg == nil { return nil, errors.New(ErrEmptyMatchStageConfig) } - if cfg.PipelineName == "" { + if cfg.PipelineName != nil && *cfg.PipelineName == "" { return nil, errors.New(ErrPipelineNameRequired) } if cfg.Selector == "" { @@ -47,7 +49,8 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { return matchers, nil } -func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { +// newMatcherStage creates a new matcherStage from config +func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) { cfg := &MatcherConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -58,9 +61,15 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu return nil, err } - pl, err := NewPipeline(logger, cfg.Stages, cfg.PipelineName, registerer) + var nPtr *string + if cfg.PipelineName != nil && jobName != nil { + name := *jobName + "_" + *cfg.PipelineName + nPtr = &name + } + + pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer) if err != nil { - return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", cfg.PipelineName) + return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config) } return &matcherStage{ @@ -71,6 +80,7 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu }, nil } +// matcherStage applies Label matchers to determine if the include stages should be run type matcherStage struct { cfgs *MatcherConfig matchers []*labels.Matcher @@ -78,6 +88,7 @@ type matcherStage struct { pipeline Stage } +// Process implements Stage func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for _, filter := range m.matchers { if !filter.Matches(string(labels[model.LabelName(filter.Name)])) { diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index c8d66b24cbde5035ff10225ade50083d7e1c35c7..68e50ffb081dae50e327b8423cf2367957075a31 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -1,12 +1,14 @@ package stages import ( + "bytes" "fmt" "testing" "time" "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) @@ -19,7 +21,6 @@ pipeline_stages: - labels: app: - match: - pipeline_name: "app1" selector: "{app=\"loki\"}" stages: - json: @@ -57,20 +58,42 @@ var testMatchLogLineApp2 = ` ` func TestMatchPipeline(t *testing.T) { - pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), "test", prometheus.DefaultRegisterer) + registry := prometheus.NewRegistry() + plName := "test_pipeline" + pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), &plName, registry) if err != nil { t.Fatal(err) } lbls := model.LabelSet{} ts := time.Now() + // Process the first log line which should extract the output from the `message` field entry := testMatchLogLineApp1 extracted := map[string]interface{}{} pl.Process(lbls, extracted, &ts, &entry) assert.Equal(t, "app1 log line", entry) + + // Process the second log line which should extract the output from the `msg` field entry = testMatchLogLineApp2 extracted = map[string]interface{}{} pl.Process(lbls, extracted, &ts, &entry) assert.Equal(t, "app2 log line", entry) + + got, err := registry.Gather() + if err != nil { + t.Fatalf("gathering metrics failed: %s", err) + } + var gotBuf bytes.Buffer + enc := expfmt.NewEncoder(&gotBuf, expfmt.FmtText) + for _, mf := range got { + if err := enc.Encode(mf); err != nil { + t.Fatalf("encoding gathered metrics failed: %s", err) + } + } + gotStr := gotBuf.String() + // We should only get metrics from the main pipeline and the second match which defines the pipeline_name + assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline\"") + assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app2\"") + assert.NotContains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app1\"") } func TestMatcher(t *testing.T) { @@ -103,7 +126,7 @@ func TestMatcher(t *testing.T) { // Build a match config which has a simple label stage that when matched will add the test_label to // the labels in the pipeline. matchConfig := MatcherConfig{ - "pl_name", + nil, tt.matcher, PipelineStages{ PipelineStage{ @@ -113,7 +136,7 @@ func TestMatcher(t *testing.T) { }, }, } - s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer) + s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer) if (err != nil) != tt.wantErr { t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index d1208fb94150be26ba07875c61724c9ced2ffdc9..b5149844d4898bbee2d5d995da83ac0c07d97c08 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -21,6 +21,10 @@ import ( const customPrefix = "promtail_custom_" const ( + MetricTypeCounter = "counter" + MetricTypeGauge = "gauge" + MetricTypeHistogram = "histogram" + ErrEmptyMetricsStageConfig = "empty metric stage configuration" ) @@ -51,8 +55,8 @@ func validateMetricsConfig(cfg MetricsConfig) error { return nil } -// newMetric creates a new set of metrics to process for each log entry -func newMetric(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) { +// newMetricStage creates a new set of metrics to process for each log entry +func newMetricStage(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) { cfgs := &MetricsConfig{} err := mapstructure.Decode(config, cfgs) if err != nil { @@ -95,12 +99,14 @@ func newMetric(logger log.Logger, config interface{}, registry prometheus.Regist }, nil } +// metricStage creates and updates prometheus metrics based on extracted pipeline data type metricStage struct { logger log.Logger cfg MetricsConfig metrics map[string]prometheus.Collector } +// Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for name, collector := range m.metrics { if v, ok := extracted[*m.cfg[name].Source]; ok { @@ -116,6 +122,7 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf } } +// recordCounter will update a counter metric func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. if counter.Cfg.Value != nil { @@ -144,6 +151,7 @@ func (m *metricStage) recordCounter(name string, counter *metric.Counters, label } } +// recordGauge will update a gauge metric func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. if gauge.Cfg.Value != nil { @@ -188,6 +196,7 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode } } +// recordHistogram will update a Histogram metric func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. if histogram.Cfg.Value != nil { @@ -210,6 +219,7 @@ func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms, histogram.With(labels).Observe(f) } +// getFloat will take the provided value and return a float64 if possible func getFloat(unk interface{}) (float64, error) { switch i := unk.(type) { diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go index cc47b3d9d4a1796966a7989a116eaeadb537e6a9..9d7c4b37a6d2a2fade781aa1cb8af1450dbd2e37 100644 --- a/pkg/logentry/stages/metrics_test.go +++ b/pkg/logentry/stages/metrics_test.go @@ -44,7 +44,7 @@ promtail_custom_loki_count 1.0 func TestMetricsPipeline(t *testing.T) { registry := prometheus.NewRegistry() - pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), "test", registry) + pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), nil, registry) if err != nil { t.Fatal(err) } @@ -150,15 +150,15 @@ func TestMetricStage_Process(t *testing.T) { } registry := prometheus.NewRegistry() - jsonStage, err := New(util.Logger, "test", StageTypeJSON, jsonConfig, registry) + jsonStage, err := New(util.Logger, nil, StageTypeJSON, jsonConfig, registry) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - regexStage, err := New(util.Logger, "test", StageTypeRegex, regexConfig, registry) + regexStage, err := New(util.Logger, nil, StageTypeRegex, regexConfig, registry) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - metricStage, err := New(util.Logger, "test", StageTypeMetric, metricsConfig, registry) + metricStage, err := New(util.Logger, nil, StageTypeMetric, metricsConfig, registry) if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } diff --git a/pkg/logentry/stages/output.go b/pkg/logentry/stages/output.go index ea49acd23c7c575a460d166591d5420d03c43e06..883928711a532543cbea70a04e36fae82128683d 100644 --- a/pkg/logentry/stages/output.go +++ b/pkg/logentry/stages/output.go @@ -22,6 +22,7 @@ type OutputConfig struct { Source string `mapstructure:"source"` } +// validateOutput validates the outputStage config func validateOutputConfig(cfg *OutputConfig) error { if cfg == nil { return errors.New(ErrEmptyOutputStageConfig) @@ -32,8 +33,8 @@ func validateOutputConfig(cfg *OutputConfig) error { return nil } -// newLabel creates a new set of metrics to process for each log entry -func newOutput(logger log.Logger, config interface{}) (*outputStage, error) { +// newOutputStage creates a new outputStage +func newOutputStage(logger log.Logger, config interface{}) (*outputStage, error) { cfg := &OutputConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -49,11 +50,13 @@ func newOutput(logger log.Logger, config interface{}) (*outputStage, error) { }, nil } +// outputStage will mutate the incoming entry and set it from extracted data type outputStage struct { cfgs *OutputConfig logger log.Logger } +// Process implements Stage func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { if o.cfgs == nil { return diff --git a/pkg/logentry/stages/output_test.go b/pkg/logentry/stages/output_test.go index 00c1f5dd7864aaa70ebbe66ba01186b13e36678e..8429dc7e18d71eb3c95d1b3f0994f33855e8578f 100644 --- a/pkg/logentry/stages/output_test.go +++ b/pkg/logentry/stages/output_test.go @@ -32,7 +32,7 @@ var testOutputLogLine = ` ` func TestPipeline_Output(t *testing.T) { - pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), "test", prometheus.DefaultRegisterer) + pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), nil, prometheus.DefaultRegisterer) if err != nil { t.Fatal(err) } @@ -77,4 +77,35 @@ func TestOutputValidation(t *testing.T) { } } -//TODO test label processing +func TestOutputStage_Process(t *testing.T) { + tests := map[string]struct { + config OutputConfig + extracted map[string]interface{} + expectedOutput string + }{ + "sets output": { + OutputConfig{ + Source: "out", + }, + map[string]interface{}{ + "something": "notimportant", + "out": "outmessage", + }, + "outmessage", + }, + } + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + st, err := newOutputStage(util.Logger, test.config) + if err != nil { + t.Fatal(err) + } + lbls := model.LabelSet{} + entry := "replaceme" + st.Process(lbls, test.extracted, nil, &entry) + assert.Equal(t, test.expectedOutput, entry) + }) + } +} diff --git a/pkg/logentry/stages/pipeline.go b/pkg/logentry/stages/pipeline.go index 87114ab79c26320031075fc7aa07deb1b1f81263..636fbb925a3ed3365a0094eb0bf0160dfe2035f4 100644 --- a/pkg/logentry/stages/pipeline.go +++ b/pkg/logentry/stages/pipeline.go @@ -7,21 +7,11 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/promtail/api" ) -var ( - pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "logentry", - Name: "pipeline_duration_seconds", - Help: "Label and metric extraction pipeline processing time, in seconds", - Buckets: []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000}, - }, []string{"job_name"}) -) - // PipelineStages contains configuration for each stage within a pipeline type PipelineStages = []interface{} @@ -30,13 +20,30 @@ type PipelineStage = map[interface{}]interface{} // Pipeline pass down a log entry to each stage for mutation and/or label extraction. type Pipeline struct { - logger log.Logger - stages []Stage - jobName string + logger log.Logger + stages []Stage + jobName *string + plDuration *prometheus.HistogramVec } // NewPipeline creates a new log entry pipeline from a configuration -func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registerer prometheus.Registerer) (*Pipeline, error) { +func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) { + hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "logentry", + Name: "pipeline_duration_seconds", + Help: "Label and metric extraction pipeline processing time, in seconds", + Buckets: []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000}, + }, []string{"job_name"}) + err := registerer.Register(hist) + if err != nil { + if existing, ok := err.(prometheus.AlreadyRegisteredError); ok { + hist = existing.ExistingCollector.(*prometheus.HistogramVec) + } else { + // Same behavior as MustRegister if the error is not for AlreadyRegistered + panic(err) + } + } + st := []Stage{} for _, s := range stgs { stage, ok := s.(PipelineStage) @@ -60,9 +67,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registe } } return &Pipeline{ - logger: log.With(logger, "component", "pipeline"), - stages: st, - jobName: jobName, + logger: log.With(logger, "component", "pipeline"), + stages: st, + jobName: jobName, + plDuration: hist, }, nil } @@ -75,7 +83,9 @@ func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface } dur := time.Since(start).Seconds() level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur) - pipelineDuration.WithLabelValues(p.jobName).Observe(dur) + if p.jobName != nil { + p.plDuration.WithLabelValues(*p.jobName).Observe(dur) + } } // Wrap implements EntryMiddleware diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index b323e62819c979f8879e1ac006f20b50cb5880df..7ecf8719b3c03b4fa14a4542c48d705c54eb840e 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -24,7 +24,6 @@ var ( var testYaml = ` pipeline_stages: - match: - pipeline_name: "test_pipeline" selector: "{match=\"true\"}" stages: - docker: @@ -49,7 +48,7 @@ func loadConfig(yml string) PipelineStages { func TestNewPipeline(t *testing.T) { - p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", prometheus.DefaultRegisterer) + p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil, prometheus.DefaultRegisterer) if err != nil { panic(err) } @@ -67,7 +66,7 @@ func TestPipeline_MultiStage(t *testing.T) { if err != nil { panic(err) } - p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", prometheus.DefaultRegisterer) + p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer) if err != nil { panic(err) } @@ -155,7 +154,7 @@ func BenchmarkPipeline(b *testing.B) { } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { - pl, err := NewPipeline(bm.logger, bm.stgs, "test", prometheus.DefaultRegisterer) + pl, err := NewPipeline(bm.logger, bm.stgs, nil, prometheus.DefaultRegisterer) if err != nil { panic(err) } diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go index c459194e890a7697ab0d7991c48f739b2fd4301c..10d8cc2f4213eee0fea5529107aa45dee2ffcb72 100644 --- a/pkg/logentry/stages/regex.go +++ b/pkg/logentry/stages/regex.go @@ -18,6 +18,7 @@ const ( ErrEmptyRegexStageConfig = "empty regex stage configuration" ) +// RegexConfig contains a regexStage configuration type RegexConfig struct { Expression string `mapstructure:"expression"` } @@ -40,14 +41,14 @@ func validateRegexConfig(c *RegexConfig) (*regexp.Regexp, error) { return expr, nil } -// regexStage mutates log entries using regex +// regexStage sets extracted data using regular expressions type regexStage struct { cfg *RegexConfig expression *regexp.Regexp logger log.Logger } -// newRegexStage creates a new regular expression Mutator. +// newRegexStage creates a newRegexStage func newRegexStage(logger log.Logger, config interface{}) (Stage, error) { cfg, err := parseRegexConfig(config) if err != nil { @@ -64,6 +65,7 @@ func newRegexStage(logger log.Logger, config interface{}) (Stage, error) { }, nil } +// parseRegexConfig processes an incoming configuration into a RegexConfig func parseRegexConfig(config interface{}) (*RegexConfig, error) { cfg := &RegexConfig{} err := mapstructure.Decode(config, cfg) diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go index a33d2f9fffb35ffed665ce9413391ec51a3eecbd..830bc0159042c3e9755780d23a17278894d3fc9e 100644 --- a/pkg/logentry/stages/regex_test.go +++ b/pkg/logentry/stages/regex_test.go @@ -129,7 +129,7 @@ func TestRegexParser_Parse(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := New(util.Logger, "test", StageTypeRegex, tt.config, nil) + p, err := New(util.Logger, nil, StageTypeRegex, tt.config, nil) if err != nil { t.Fatalf("failed to create regex parser: %s", err) } @@ -171,7 +171,7 @@ func BenchmarkRegexStage(b *testing.B) { } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { - stage, err := New(util.Logger, "test", StageTypeRegex, bm.config, nil) + stage, err := New(util.Logger, nil, StageTypeRegex, bm.config, nil) if err != nil { panic(err) } diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 939da50b2ffca0c12da1ec4252a257f41bc88222..f5f2d7f52b16394e4d38391215755723848bcba9 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -36,18 +36,18 @@ func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface } // New creates a new stage for the given type and configuration. -func New(logger log.Logger, jobName string, stageType string, +func New(logger log.Logger, jobName *string, stageType string, cfg interface{}, registerer prometheus.Registerer) (Stage, error) { var s Stage var err error switch stageType { case StageTypeDocker: - s, err = NewDocker(logger, jobName, registerer) + s, err = NewDocker(logger, registerer) if err != nil { return nil, err } case StageTypeCRI: - s, err = NewCRI(logger, jobName, registerer) + s, err = NewCRI(logger, registerer) if err != nil { return nil, err } @@ -62,27 +62,27 @@ func New(logger log.Logger, jobName string, stageType string, return nil, err } case StageTypeMetric: - s, err = newMetric(logger, cfg, registerer) + s, err = newMetricStage(logger, cfg, registerer) if err != nil { return nil, err } case StageTypeLabel: - s, err = newLabel(logger, cfg) + s, err = newLabelStage(logger, cfg) if err != nil { return nil, err } case StageTypeTimestamp: - s, err = newTimestamp(logger, cfg) + s, err = newTimestampStage(logger, cfg) if err != nil { return nil, err } case StageTypeOutput: - s, err = newOutput(logger, cfg) + s, err = newOutputStage(logger, cfg) if err != nil { return nil, err } case StageTypeMatch: - s, err = newMatcherStage(logger, cfg, registerer) + s, err = newMatcherStage(logger, jobName, cfg, registerer) if err != nil { return nil, err } diff --git a/pkg/logentry/stages/timestamp.go b/pkg/logentry/stages/timestamp.go index 8ea2163bbcd439f46e42466b43e51c428c870c8c..035288b1a14f84131d8511c3994b03eacfa8c825 100644 --- a/pkg/logentry/stages/timestamp.go +++ b/pkg/logentry/stages/timestamp.go @@ -23,6 +23,7 @@ type TimestampConfig struct { Format string `mapstructure:"format"` } +// validateTimestampConfig validates a timestampStage configuration func validateTimestampConfig(cfg *TimestampConfig) (string, error) { if cfg == nil { return "", errors.New(ErrEmptyTimestampStageConfig) @@ -37,8 +38,8 @@ func validateTimestampConfig(cfg *TimestampConfig) (string, error) { } -// newTimestamp creates a new timestamp extraction pipeline stage. -func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error) { +// newTimestampStage creates a new timestamp extraction pipeline stage. +func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage, error) { cfg := &TimestampConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -55,12 +56,14 @@ func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error }, nil } +// timestampStage will set the timestamp using extracted data type timestampStage struct { cfgs *TimestampConfig logger log.Logger format string } +// Process implements Stage func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { if ts.cfgs == nil { return diff --git a/pkg/logentry/stages/timestamp_test.go b/pkg/logentry/stages/timestamp_test.go index 97b35bd97ddd936f208ac7bd239c9efba6c868d5..a9b32dd655a238283716826fd818d8b97bb8421b 100644 --- a/pkg/logentry/stages/timestamp_test.go +++ b/pkg/logentry/stages/timestamp_test.go @@ -23,7 +23,7 @@ pipeline_stages: var testTimestampLogLine = ` { - "time":"2012-11-01T22:08:41+00:00", + "time":"2012-11-01T22:08:41-04:00", "app":"loki", "component": ["parser","type"], "level" : "WARN" @@ -31,7 +31,7 @@ var testTimestampLogLine = ` ` func TestTimestampPipeline(t *testing.T) { - pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), "test", prometheus.DefaultRegisterer) + pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), nil, prometheus.DefaultRegisterer) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func TestTimestampPipeline(t *testing.T) { entry := testTimestampLogLine extracted := map[string]interface{}{} pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", 0)), ts) + assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)), ts) } func TestTimestampValidation(t *testing.T) { @@ -100,4 +100,37 @@ func TestTimestampValidation(t *testing.T) { } } -//TODO process tests +func TestTimestampStage_Process(t *testing.T) { + tests := map[string]struct { + config TimestampConfig + extracted map[string]interface{} + expected time.Time + }{ + "set success": { + TimestampConfig{ + Source: "ts", + Format: time.RFC3339, + }, + map[string]interface{}{ + "somethigelse": "notimportant", + "ts": "2106-01-02T23:04:05-04:00", + }, + time.Date(2106, 01, 02, 23, 04, 05, 0, time.FixedZone("", -4*60*60)), + }, + } + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + st, err := newTimestampStage(util.Logger, test.config) + if err != nil { + t.Fatal(err) + } + ts := time.Now() + lbls := model.LabelSet{} + st.Process(lbls, test.extracted, &ts, nil) + assert.Equal(t, test.expected, ts) + + }) + } +} diff --git a/pkg/logentry/stages/util.go b/pkg/logentry/stages/util.go index 441a7ead0ea391829886d03c24b0412de431021a..baa13a189a2c3c27c2177d7fe050ac447cdb49c0 100644 --- a/pkg/logentry/stages/util.go +++ b/pkg/logentry/stages/util.go @@ -34,6 +34,7 @@ func convertDateLayout(predef string) string { } } +// getString will convert the input variable to a string if possible func getString(unk interface{}) (string, error) { switch i := unk.(type) { diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go index 7d8e417192f1ddae9cd395801066c4d706b2a0df..3419a1eb83041d47e3929e0cbe31351da01572a8 100644 --- a/pkg/promtail/targets/filetargetmanager.go +++ b/pkg/promtail/targets/filetargetmanager.go @@ -77,7 +77,7 @@ func NewFileTargetManager( config := map[string]sd_config.ServiceDiscoveryConfig{} for _, cfg := range scrapeConfigs { registerer := prometheus.DefaultRegisterer - pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName, registerer) + pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, registerer) if err != nil { return nil, err } @@ -87,14 +87,14 @@ func NewFileTargetManager( switch cfg.EntryParser { case api.CRI: level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages") - cri, err := stages.NewCRI(logger, cfg.JobName, registerer) + cri, err := stages.NewCRI(logger, registerer) if err != nil { return nil, err } pipeline.AddStage(cri) case api.Docker: level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages") - docker, err := stages.NewDocker(logger, cfg.JobName, registerer) + docker, err := stages.NewDocker(logger, registerer) if err != nil { return nil, err }