From 8dd490d238f555e25fdfdf3cb1ae1422ff573099 Mon Sep 17 00:00:00 2001 From: Edward Welch <edward.welch@grafana.com> Date: Fri, 12 Apr 2019 07:53:31 -0400 Subject: [PATCH] beginnings of label extraction pipeline --- pkg/logentry/pipeline.go | 67 +++++++++++++++++++++++ pkg/logentry/pipeline_test.go | 27 +++++++++ pkg/logentry/stages/regex.go | 32 +++++++++++ pkg/promtail/promtail_test.go | 2 + pkg/promtail/scrape/scrape.go | 6 +- pkg/promtail/targets/filetargetmanager.go | 7 ++- 6 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 pkg/logentry/pipeline.go create mode 100644 pkg/logentry/pipeline_test.go create mode 100644 pkg/logentry/stages/regex.go diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go new file mode 100644 index 00000000..ca587489 --- /dev/null +++ b/pkg/logentry/pipeline.go @@ -0,0 +1,67 @@ +package logentry + +import ( + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/promtail/api" +) + +// Parser takes an existing set of labels, timestamp and log entry and returns either a possibly mutated +// timestamp and log entry +type Parser interface { + Parse(labels model.LabelSet, time *time.Time, entry *string) +} + +type PipelineStage map[interface{}]interface{} + +type Pipeline struct { + stages []Parser +} + +func NewPipeline(stages []PipelineStage) (Pipeline, error) { + + //rg := config.ParserStages[0]["regex"] + + //The idea is to load the stages, possibly using reflection to instantiate based on packages? + //Then the processor will pass every log line through the pipeline stages + //With debug logging to show what the labels are before/after as well as timestamp and log message + //Metrics to cover each pipeline stage and the entire process (do we have to pass the metrics in to the processor??) + + //Error handling, fail on setup errors?, fail on pipeline processing errors? + + //we handle safe casting so we can direct the user to yaml issues if the key isn't a string + + //for _, s := range config.ParserStages { + // if len(s) > 1 { + // panic("Pipeline stages must contain only one key:") + // } + // + // switch s { + // case "regex": + // var cfg2 Config + // err := mapstructure.Decode(rg, &cfg2) + // if err != nil { + // panic(err) + // } + // } + //} + return Pipeline{}, nil +} + +func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string) { + //debug log labels, time, and string + for _, parser := range p.stages { + parser.Parse(labels, time, entry) + //debug log labels, time, and string + } +} + +func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { + return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error { + p.Process(labels, ×tamp, &line) + return next.Handle(labels, timestamp, line) + }) + +} diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go new file mode 100644 index 00000000..cece7c25 --- /dev/null +++ b/pkg/logentry/pipeline_test.go @@ -0,0 +1,27 @@ +package logentry + +import ( + "testing" + + "gopkg.in/yaml.v2" + + "github.com/grafana/loki/pkg/logentry/stages" +) + +var testYaml = ` +parser_stages: + - regex: + expr: ./* + labels: + - test: + source: somesource + +` + +func TestNewProcessor(t *testing.T) { + var config stages.Config + err := yaml.Unmarshal([]byte(testYaml), &config) + if err != nil { + panic(err) + } +} diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go new file mode 100644 index 00000000..96619dc5 --- /dev/null +++ b/pkg/logentry/stages/regex.go @@ -0,0 +1,32 @@ +package stages + +import ( + "regexp" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/parser" +) + +type Config struct { + Expr string + Labels []parser.Label +} + +type Regex struct { + expr *regexp.Regexp +} + +func NewRegex(config map[interface{}]interface{}) Regex { + + err := mapstructure.Decode(rg, &cfg2) + return Regex{ + expr: regexp.MustCompile(config.Expr), + } +} + +func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) { + +} diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 31f5b2aa..0034af87 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -25,7 +25,9 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" + "github.com/grafana/loki/pkg/logentry" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/parser" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/config" "github.com/grafana/loki/pkg/promtail/scrape" diff --git a/pkg/promtail/scrape/scrape.go b/pkg/promtail/scrape/scrape.go index 932b51e4..714809d7 100644 --- a/pkg/promtail/scrape/scrape.go +++ b/pkg/promtail/scrape/scrape.go @@ -6,20 +6,20 @@ import ( sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/pkg/relabel" - "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/logentry" ) // Config describes a job to scrape. type Config struct { JobName string `yaml:"job_name,omitempty"` - EntryParser api.EntryParser `yaml:"entry_parser"` + PipelineStages []logentry.PipelineStage `yaml:"pipeline_stages,omitempty"` RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"` } // DefaultScrapeConfig is the default Config. var DefaultScrapeConfig = Config{ - EntryParser: api.Docker, + //PipelineStages: api.Docker, } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go index 6aded414..ac2204f6 100644 --- a/pkg/promtail/targets/filetargetmanager.go +++ b/pkg/promtail/targets/filetargetmanager.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/prometheus/relabel" "github.com/grafana/loki/pkg/helpers" + "github.com/grafana/loki/pkg/logentry" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/scrape" @@ -73,6 +74,10 @@ func NewFileTargetManager( config := map[string]sd_config.ServiceDiscoveryConfig{} for _, cfg := range scrapeConfigs { + pipeline, err := logentry.NewPipeline(cfg.PipelineStages) + if err != nil { + return nil, err + } s := &targetSyncer{ log: logger, positions: positions, @@ -80,7 +85,7 @@ func NewFileTargetManager( targets: map[string]*FileTarget{}, droppedTargets: []Target{}, hostname: hostname, - entryHandler: cfg.EntryParser.Wrap(client), + entryHandler: pipeline.Wrap(client), targetConfig: targetConfig, } tm.syncers[cfg.JobName] = s -- GitLab