From 97e4a122cac011bb6153a199163faeb04b1b7daa Mon Sep 17 00:00:00 2001
From: Edward Welch <edward.welch@grafana.com>
Date: Mon, 6 May 2019 11:54:51 -0400
Subject: [PATCH] adding a pipeline processing histogram

---
 cmd/promtail/main.go                      |  3 +++
 pkg/logentry/pipeline.go                  | 18 +++++++++++++-----
 pkg/logentry/pipeline_test.go             |  6 +++---
 pkg/promtail/targets/filetargetmanager.go | 10 ++++++++--
 4 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/cmd/promtail/main.go b/cmd/promtail/main.go
index d5c34433..ba9fb026 100644
--- a/cmd/promtail/main.go
+++ b/cmd/promtail/main.go
@@ -37,6 +37,9 @@ func main() {
 		}
 	}
 
+	// Re-init the logger which will now honor a different log level set in ServerConfig.Config
+	util.InitLogger(&config.ServerConfig.Config)
+
 	p, err := promtail.New(config)
 	if err != nil {
 		level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go
index d8300ce7..372c1db5 100644
--- a/pkg/logentry/pipeline.go
+++ b/pkg/logentry/pipeline.go
@@ -6,6 +6,7 @@ import (
 	"github.com/go-kit/kit/log"
 	"github.com/go-kit/kit/log/level"
 	"github.com/pkg/errors"
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
 
 	"github.com/grafana/loki/pkg/logentry/stages"
@@ -19,10 +20,11 @@ type PipelineStages []interface{}
 type Pipeline struct {
 	logger log.Logger
 	stages []stages.Stage
+	plObs  *prometheus.Observer
 }
 
 // NewPipeline creates a new log entry pipeline from a configuration
-func NewPipeline(logger log.Logger, stgs PipelineStages) (*Pipeline, error) {
+func NewPipeline(logger log.Logger, stgs PipelineStages, plObserverMicroSeconds *prometheus.Observer) (*Pipeline, error) {
 	st := []stages.Stage{}
 	for _, s := range stgs {
 		stage, ok := s.(map[interface{}]interface{})
@@ -69,16 +71,22 @@ func NewPipeline(logger log.Logger, stgs PipelineStages) (*Pipeline, error) {
 	return &Pipeline{
 		logger: log.With(logger, "component", "pipeline"),
 		stages: st,
+		plObs:  plObserverMicroSeconds,
 	}, nil
 }
 
 // Process mutates an entry and its metadata by using multiple configure stage.
-func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string) {
+func (p *Pipeline) Process(labels model.LabelSet, ts *time.Time, entry *string) {
+	start := time.Now()
 	for i, stage := range p.stages {
-		level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", time, "entry", entry)
-		stage.Process(labels, time, entry)
+		level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", ts, "entry", entry)
+		stage.Process(labels, ts, entry)
+	}
+	durUs := float64(time.Since(start).Nanoseconds()) / 1000
+	level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_us", durUs)
+	if p.plObs != nil {
+		(*p.plObs).Observe(durUs)
 	}
-	level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", time, "entry", entry)
 }
 
 // Wrap implements EntryMiddleware
diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go
index c7930c33..322db9b2 100644
--- a/pkg/logentry/pipeline_test.go
+++ b/pkg/logentry/pipeline_test.go
@@ -42,7 +42,7 @@ func loadConfig(yml string) PipelineStages {
 
 func TestNewPipeline(t *testing.T) {
 
-	p, err := NewPipeline(util.Logger, loadConfig(testYaml))
+	p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil)
 	if err != nil {
 		panic(err)
 	}
@@ -62,7 +62,7 @@ func TestPipeline_MultiStage(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}))
+	p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil)
 	if err != nil {
 		panic(err)
 	}
@@ -138,7 +138,7 @@ func Benchmark(b *testing.B) {
 	}
 	for _, bm := range benchmarks {
 		b.Run(bm.name, func(b *testing.B) {
-			pl, err := NewPipeline(bm.logger, bm.stgs)
+			pl, err := NewPipeline(bm.logger, bm.stgs, nil)
 			if err != nil {
 				panic(err)
 			}
diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go
index e6351823..6ab04031 100644
--- a/pkg/promtail/targets/filetargetmanager.go
+++ b/pkg/promtail/targets/filetargetmanager.go
@@ -42,6 +42,12 @@ var (
 		Name:      "targets_active_total",
 		Help:      "Number of active total.",
 	})
+	pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "promtail",
+		Name:      "pipeline_duration_microseconds",
+		Help:      "Label extraction pipeline processing time, in microseconds",
+		Buckets:   []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 25000},
+	}, []string{"job_name"})
 )
 
 // FileTargetManager manages a set of targets.
@@ -75,8 +81,8 @@ func NewFileTargetManager(
 
 	config := map[string]sd_config.ServiceDiscoveryConfig{}
 	for _, cfg := range scrapeConfigs {
-
-		pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages)
+		obs := pipelineDuration.WithLabelValues(cfg.JobName)
+		pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &obs)
 		if err != nil {
 			return nil, err
 		}
-- 
GitLab