Skip to content
Snippets Groups Projects
Commit 97e4a122 authored by Edward Welch's avatar Edward Welch Committed by Ed
Browse files

adding a pipeline processing histogram

parent 0a9beea9
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,9 @@ func main() {
}
}
// Re-init the logger which will now honor a different log level set in ServerConfig.Config
util.InitLogger(&config.ServerConfig.Config)
p, err := promtail.New(config)
if err != nil {
level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
......
......@@ -6,6 +6,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logentry/stages"
......@@ -19,10 +20,11 @@ type PipelineStages []interface{}
type Pipeline struct {
logger log.Logger
stages []stages.Stage
plObs *prometheus.Observer
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages) (*Pipeline, error) {
func NewPipeline(logger log.Logger, stgs PipelineStages, plObserverMicroSeconds *prometheus.Observer) (*Pipeline, error) {
st := []stages.Stage{}
for _, s := range stgs {
stage, ok := s.(map[interface{}]interface{})
......@@ -69,16 +71,22 @@ func NewPipeline(logger log.Logger, stgs PipelineStages) (*Pipeline, error) {
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
plObs: plObserverMicroSeconds,
}, nil
}
// Process mutates an entry and its metadata by using multiple configure stage.
func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string) {
func (p *Pipeline) Process(labels model.LabelSet, ts *time.Time, entry *string) {
start := time.Now()
for i, stage := range p.stages {
level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", time, "entry", entry)
stage.Process(labels, time, entry)
level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", ts, "entry", entry)
stage.Process(labels, ts, entry)
}
durUs := float64(time.Since(start).Nanoseconds()) / 1000
level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_us", durUs)
if p.plObs != nil {
(*p.plObs).Observe(durUs)
}
level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", time, "entry", entry)
}
// Wrap implements EntryMiddleware
......
......@@ -42,7 +42,7 @@ func loadConfig(yml string) PipelineStages {
func TestNewPipeline(t *testing.T) {
p, err := NewPipeline(util.Logger, loadConfig(testYaml))
p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil)
if err != nil {
panic(err)
}
......@@ -62,7 +62,7 @@ func TestPipeline_MultiStage(t *testing.T) {
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}))
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil)
if err != nil {
panic(err)
}
......@@ -138,7 +138,7 @@ func Benchmark(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
pl, err := NewPipeline(bm.logger, bm.stgs)
pl, err := NewPipeline(bm.logger, bm.stgs, nil)
if err != nil {
panic(err)
}
......
......@@ -42,6 +42,12 @@ var (
Name: "targets_active_total",
Help: "Number of active total.",
})
pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "pipeline_duration_microseconds",
Help: "Label extraction pipeline processing time, in microseconds",
Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 25000},
}, []string{"job_name"})
)
// FileTargetManager manages a set of targets.
......@@ -75,8 +81,8 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages)
obs := pipelineDuration.WithLabelValues(cfg.JobName)
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &obs)
if err != nil {
return nil, err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment