Skip to content
Snippets Groups Projects
Commit 8dd490d2 authored by Edward Welch's avatar Edward Welch Committed by Ed
Browse files

beginnings of label extraction pipeline

parent 64465035
No related branches found
No related tags found
No related merge requests found
package logentry
import (
"time"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/promtail/api"
)
// Parser takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
// timestamp and log entry
type Parser interface {
Parse(labels model.LabelSet, time *time.Time, entry *string)
}
type PipelineStage map[interface{}]interface{}
type Pipeline struct {
stages []Parser
}
func NewPipeline(stages []PipelineStage) (Pipeline, error) {
//rg := config.ParserStages[0]["regex"]
//The idea is to load the stages, possibly using reflection to instantiate based on packages?
//Then the processor will pass every log line through the pipeline stages
//With debug logging to show what the labels are before/after as well as timestamp and log message
//Metrics to cover each pipeline stage and the entire process (do we have to pass the metrics in to the processor??)
//Error handling, fail on setup errors?, fail on pipeline processing errors?
//we handle safe casting so we can direct the user to yaml issues if the key isn't a string
//for _, s := range config.ParserStages {
// if len(s) > 1 {
// panic("Pipeline stages must contain only one key:")
// }
//
// switch s {
// case "regex":
// var cfg2 Config
// err := mapstructure.Decode(rg, &cfg2)
// if err != nil {
// panic(err)
// }
// }
//}
return Pipeline{}, nil
}
func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string) {
//debug log labels, time, and string
for _, parser := range p.stages {
parser.Parse(labels, time, entry)
//debug log labels, time, and string
}
}
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error {
p.Process(labels, &timestamp, &line)
return next.Handle(labels, timestamp, line)
})
}
package logentry
import (
"testing"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/logentry/stages"
)
var testYaml = `
parser_stages:
- regex:
expr: ./*
labels:
- test:
source: somesource
`
func TestNewProcessor(t *testing.T) {
var config stages.Config
err := yaml.Unmarshal([]byte(testYaml), &config)
if err != nil {
panic(err)
}
}
package stages
import (
"regexp"
"time"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/parser"
)
type Config struct {
Expr string
Labels []parser.Label
}
type Regex struct {
expr *regexp.Regexp
}
func NewRegex(config map[interface{}]interface{}) Regex {
err := mapstructure.Decode(rg, &cfg2)
return Regex{
expr: regexp.MustCompile(config.Expr),
}
}
func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) {
}
......@@ -25,7 +25,9 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/config"
"github.com/grafana/loki/pkg/promtail/scrape"
......
......@@ -6,20 +6,20 @@ import (
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/logentry"
)
// Config describes a job to scrape.
type Config struct {
JobName string `yaml:"job_name,omitempty"`
EntryParser api.EntryParser `yaml:"entry_parser"`
PipelineStages []logentry.PipelineStage `yaml:"pipeline_stages,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
}
// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
EntryParser: api.Docker,
//PipelineStages: api.Docker,
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
......
......@@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/relabel"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/scrape"
......@@ -73,6 +74,10 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := logentry.NewPipeline(cfg.PipelineStages)
if err != nil {
return nil, err
}
s := &targetSyncer{
log: logger,
positions: positions,
......@@ -80,7 +85,7 @@ func NewFileTargetManager(
targets: map[string]*FileTarget{},
droppedTargets: []Target{},
hostname: hostname,
entryHandler: cfg.EntryParser.Wrap(client),
entryHandler: pipeline.Wrap(client),
targetConfig: targetConfig,
}
tm.syncers[cfg.JobName] = s
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment