From 3a2ac64b7ced573e7a7ed513b7f1e4df344a1c4d Mon Sep 17 00:00:00 2001
From: Edward Welch <edward.welch@grafana.com>
Date: Thu, 30 May 2019 15:58:09 -0400
Subject: [PATCH] cleaning up remaining TODO's, adding tests cleaning up GoDoc
 pipeline_name is now optional for matcher stage

---
 pkg/logentry/stages/configs.go            |  7 ----
 pkg/logentry/stages/extensions.go         |  9 ++---
 pkg/logentry/stages/extensions_test.go    |  4 +-
 pkg/logentry/stages/json.go               |  7 ++--
 pkg/logentry/stages/json_test.go          |  2 +-
 pkg/logentry/stages/labels.go             |  8 +++-
 pkg/logentry/stages/labels_test.go        | 49 ++++++++++++++++++++++-
 pkg/logentry/stages/match.go              | 23 ++++++++---
 pkg/logentry/stages/match_test.go         | 31 ++++++++++++--
 pkg/logentry/stages/metrics.go            | 14 ++++++-
 pkg/logentry/stages/metrics_test.go       |  8 ++--
 pkg/logentry/stages/output.go             |  7 +++-
 pkg/logentry/stages/output_test.go        | 35 +++++++++++++++-
 pkg/logentry/stages/pipeline.go           | 46 ++++++++++++---------
 pkg/logentry/stages/pipeline_test.go      |  7 ++--
 pkg/logentry/stages/regex.go              |  6 ++-
 pkg/logentry/stages/regex_test.go         |  4 +-
 pkg/logentry/stages/stage.go              | 16 ++++----
 pkg/logentry/stages/timestamp.go          |  7 +++-
 pkg/logentry/stages/timestamp_test.go     | 41 +++++++++++++++++--
 pkg/logentry/stages/util.go               |  1 +
 pkg/promtail/targets/filetargetmanager.go |  6 +--
 22 files changed, 253 insertions(+), 85 deletions(-)
 delete mode 100644 pkg/logentry/stages/configs.go

diff --git a/pkg/logentry/stages/configs.go b/pkg/logentry/stages/configs.go
deleted file mode 100644
index 1b4b85a2..00000000
--- a/pkg/logentry/stages/configs.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package stages
-
-const (
-	MetricTypeCounter   = "counter"
-	MetricTypeGauge     = "gauge"
-	MetricTypeHistogram = "histogram"
-)
diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go
index 27fb392e..9f2030c4 100644
--- a/pkg/logentry/stages/extensions.go
+++ b/pkg/logentry/stages/extensions.go
@@ -8,7 +8,7 @@ import (
 const RFC3339Nano = "RFC3339Nano"
 
 // NewDocker creates a Docker json log format specific pipeline stage.
-func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
+func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
 	stages := PipelineStages{
 		PipelineStage{
 			StageTypeJSON: JSONConfig{
@@ -32,12 +32,11 @@ func NewDocker(logger log.Logger, jobName string, registerer prometheus.Register
 				"output",
 			},
 		}}
-
-	return NewPipeline(logger, stages, jobName+"_docker", registerer)
+	return NewPipeline(logger, stages, nil, registerer)
 }
 
 // NewCRI creates a CRI format specific pipeline stage
-func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
+func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
 	stages := PipelineStages{
 		PipelineStage{
 			StageTypeRegex: RegexConfig{
@@ -61,5 +60,5 @@ func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer)
 			},
 		},
 	}
-	return NewPipeline(logger, stages, jobName+"_cri", registerer)
+	return NewPipeline(logger, stages, nil, registerer)
 }
diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go
index 9f44d0a8..f544614c 100644
--- a/pkg/logentry/stages/extensions_test.go
+++ b/pkg/logentry/stages/extensions_test.go
@@ -65,7 +65,7 @@ func TestNewDocker(t *testing.T) {
 		tt := tt
 		t.Run(tName, func(t *testing.T) {
 			t.Parallel()
-			p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer)
+			p, err := NewDocker(util.Logger, prometheus.DefaultRegisterer)
 			if err != nil {
 				t.Fatalf("failed to create Docker parser: %s", err)
 			}
@@ -141,7 +141,7 @@ func TestNewCri(t *testing.T) {
 		tt := tt
 		t.Run(tName, func(t *testing.T) {
 			t.Parallel()
-			p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer)
+			p, err := NewCRI(util.Logger, prometheus.DefaultRegisterer)
 			if err != nil {
 				t.Fatalf("failed to create CRI parser: %s", err)
 			}
diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go
index a7df0150..2458d744 100644
--- a/pkg/logentry/stages/json.go
+++ b/pkg/logentry/stages/json.go
@@ -21,6 +21,7 @@ const (
 	ErrEmptyJSONStageConfig = "empty json stage configuration"
 )
 
+// JSONConfig represents a JSON Stage configuration
 type JSONConfig struct {
 	Expressions map[string]string `mapstructure:"expressions"`
 }
@@ -52,14 +53,14 @@ func validateJSONConfig(c *JSONConfig) (map[string]*jmespath.JMESPath, error) {
 	return expressions, nil
 }
 
-// jsonStage extracts log data via json parsing.
+// jsonStage sets extracted data using JMESPath expressions
 type jsonStage struct {
 	cfg         *JSONConfig
 	expressions map[string]*jmespath.JMESPath
 	logger      log.Logger
 }
 
-// newJSONStage creates a new json mutator from a config.
+// newJSONStage creates a new json pipeline stage from a config.
 func newJSONStage(logger log.Logger, config interface{}) (*jsonStage, error) {
 	cfg, err := parseJSONConfig(config)
 	if err != nil {
@@ -85,7 +86,7 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) {
 	return cfg, nil
 }
 
-// Process implements Mutator
+// Process implements Stage
 func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	if entry == nil {
 		level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go
index ab8173fd..772a506c 100644
--- a/pkg/logentry/stages/json_test.go
+++ b/pkg/logentry/stages/json_test.go
@@ -174,7 +174,7 @@ func TestJSONParser_Parse(t *testing.T) {
 		tt := tt
 		t.Run(tName, func(t *testing.T) {
 			t.Parallel()
-			p, err := New(util.Logger, "test", StageTypeJSON, tt.config, nil)
+			p, err := New(util.Logger, nil, StageTypeJSON, tt.config, nil)
 			if err != nil {
 				t.Fatalf("failed to create json parser: %s", err)
 			}
diff --git a/pkg/logentry/stages/labels.go b/pkg/logentry/stages/labels.go
index 504d8c19..d5f75ba5 100644
--- a/pkg/logentry/stages/labels.go
+++ b/pkg/logentry/stages/labels.go
@@ -20,6 +20,7 @@ const (
 // LabelsConfig is a set of labels to be extracted
 type LabelsConfig map[string]*string
 
+// validateLabelsConfig validates the Label stage configuration
 func validateLabelsConfig(c LabelsConfig) error {
 	if c == nil {
 		return errors.New(ErrEmptyLabelStageConfig)
@@ -37,8 +38,8 @@ func validateLabelsConfig(c LabelsConfig) error {
 	return nil
 }
 
-// newLabel creates a new label stage to set labels from extracted data
-func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) {
+// newLabelStage creates a new label stage to set labels from extracted data
+func newLabelStage(logger log.Logger, configs interface{}) (*labelStage, error) {
 	cfgs := &LabelsConfig{}
 	err := mapstructure.Decode(configs, cfgs)
 	if err != nil {
@@ -54,11 +55,13 @@ func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) {
 	}, nil
 }
 
+// labelStage sets labels from extracted data
 type labelStage struct {
 	cfgs   LabelsConfig
 	logger log.Logger
 }
 
+// Process implements Stage
 func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	for lName, lSrc := range l.cfgs {
 		if _, ok := extracted[*lSrc]; ok {
@@ -66,6 +69,7 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa
 			s, err := getString(lValue)
 			if err != nil {
 				level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue).String())
+				continue
 			}
 			labelValue := model.LabelValue(s)
 			if !labelValue.IsValid() {
diff --git a/pkg/logentry/stages/labels_test.go b/pkg/logentry/stages/labels_test.go
index 75a4b1cc..af594d73 100644
--- a/pkg/logentry/stages/labels_test.go
+++ b/pkg/logentry/stages/labels_test.go
@@ -33,7 +33,7 @@ var testLabelsLogLine = `
 `
 
 func TestLabelsPipeline_Labels(t *testing.T) {
-	pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), "test", prometheus.DefaultRegisterer)
+	pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -108,4 +108,49 @@ func TestLabels(t *testing.T) {
 	}
 }
 
-//TODO test label processing
+func TestLabelStage_Process(t *testing.T) {
+	sourceName := "diff_source"
+	tests := map[string]struct {
+		config         LabelsConfig
+		extractedData  map[string]interface{}
+		inputLabels    model.LabelSet
+		expectedLabels model.LabelSet
+	}{
+		"extract_success": {
+			LabelsConfig{
+				"testLabel": nil,
+			},
+			map[string]interface{}{
+				"testLabel": "testValue",
+			},
+			model.LabelSet{},
+			model.LabelSet{
+				"testLabel": "testValue",
+			},
+		},
+		"different_source_name": {
+			LabelsConfig{
+				"testLabel": &sourceName,
+			},
+			map[string]interface{}{
+				sourceName: "testValue",
+			},
+			model.LabelSet{},
+			model.LabelSet{
+				"testLabel": "testValue",
+			},
+		},
+	}
+	for name, test := range tests {
+		test := test
+		t.Run(name, func(t *testing.T) {
+			t.Parallel()
+			st, err := newLabelStage(util.Logger, test.config)
+			if err != nil {
+				t.Fatal(err)
+			}
+			st.Process(test.inputLabels, test.extractedData, nil, nil)
+			assert.Equal(t, test.expectedLabels, test.expectedLabels)
+		})
+	}
+}
diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go
index 493875bf..59895264 100644
--- a/pkg/logentry/stages/match.go
+++ b/pkg/logentry/stages/match.go
@@ -15,23 +15,25 @@ import (
 
 const (
 	ErrEmptyMatchStageConfig = "match stage config cannot be empty"
-	ErrPipelineNameRequired  = "match stage must specify a pipeline name which is used in the exported metrics"
+	ErrPipelineNameRequired  = "match stage pipeline name can be omitted but cannot be an empty string"
 	ErrSelectorRequired      = "selector statement required for match stage"
 	ErrMatchRequiresStages   = "match stage requires at least one additional stage to be defined in '- stages'"
 	ErrSelectorSyntax        = "invalid selector syntax for match stage"
 )
 
+// MatcherConfig contains the configuration for a matcherStage
 type MatcherConfig struct {
-	PipelineName string         `mapstructure:"pipeline_name"`
+	PipelineName *string        `mapstructure:"pipeline_name"`
 	Selector     string         `mapstructure:"selector"`
 	Stages       PipelineStages `mapstructure:"stages"`
 }
 
+// validateMatcherConfig validates the MatcherConfig for the matcherStage
 func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
 	if cfg == nil {
 		return nil, errors.New(ErrEmptyMatchStageConfig)
 	}
-	if cfg.PipelineName == "" {
+	if cfg.PipelineName != nil && *cfg.PipelineName == "" {
 		return nil, errors.New(ErrPipelineNameRequired)
 	}
 	if cfg.Selector == "" {
@@ -47,7 +49,8 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
 	return matchers, nil
 }
 
-func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
+// newMatcherStage creates a new matcherStage from config
+func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) {
 	cfg := &MatcherConfig{}
 	err := mapstructure.Decode(config, cfg)
 	if err != nil {
@@ -58,9 +61,15 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu
 		return nil, err
 	}
 
-	pl, err := NewPipeline(logger, cfg.Stages, cfg.PipelineName, registerer)
+	var nPtr *string
+	if cfg.PipelineName != nil && jobName != nil {
+		name := *jobName + "_" + *cfg.PipelineName
+		nPtr = &name
+	}
+
+	pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer)
 	if err != nil {
-		return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", cfg.PipelineName)
+		return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
 	}
 
 	return &matcherStage{
@@ -71,6 +80,7 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu
 	}, nil
 }
 
+// matcherStage applies Label matchers to determine if the include stages should be run
 type matcherStage struct {
 	cfgs     *MatcherConfig
 	matchers []*labels.Matcher
@@ -78,6 +88,7 @@ type matcherStage struct {
 	pipeline Stage
 }
 
+// Process implements Stage
 func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	for _, filter := range m.matchers {
 		if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go
index c8d66b24..68e50ffb 100644
--- a/pkg/logentry/stages/match_test.go
+++ b/pkg/logentry/stages/match_test.go
@@ -1,12 +1,14 @@
 package stages
 
 import (
+	"bytes"
 	"fmt"
 	"testing"
 	"time"
 
 	"github.com/cortexproject/cortex/pkg/util"
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/common/expfmt"
 	"github.com/prometheus/common/model"
 	"github.com/stretchr/testify/assert"
 )
@@ -19,7 +21,6 @@ pipeline_stages:
 - labels:
     app:
 - match:
-    pipeline_name: "app1"
     selector: "{app=\"loki\"}"
     stages:
     - json:
@@ -57,20 +58,42 @@ var testMatchLogLineApp2 = `
 `
 
 func TestMatchPipeline(t *testing.T) {
-	pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), "test", prometheus.DefaultRegisterer)
+	registry := prometheus.NewRegistry()
+	plName := "test_pipeline"
+	pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), &plName, registry)
 	if err != nil {
 		t.Fatal(err)
 	}
 	lbls := model.LabelSet{}
 	ts := time.Now()
+	// Process the first log line which should extract the output from the `message` field
 	entry := testMatchLogLineApp1
 	extracted := map[string]interface{}{}
 	pl.Process(lbls, extracted, &ts, &entry)
 	assert.Equal(t, "app1 log line", entry)
+
+	// Process the second log line which should extract the output from the `msg` field
 	entry = testMatchLogLineApp2
 	extracted = map[string]interface{}{}
 	pl.Process(lbls, extracted, &ts, &entry)
 	assert.Equal(t, "app2 log line", entry)
+
+	got, err := registry.Gather()
+	if err != nil {
+		t.Fatalf("gathering metrics failed: %s", err)
+	}
+	var gotBuf bytes.Buffer
+	enc := expfmt.NewEncoder(&gotBuf, expfmt.FmtText)
+	for _, mf := range got {
+		if err := enc.Encode(mf); err != nil {
+			t.Fatalf("encoding gathered metrics failed: %s", err)
+		}
+	}
+	gotStr := gotBuf.String()
+	// We should only get metrics from the main pipeline and the second match which defines the pipeline_name
+	assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline\"")
+	assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app2\"")
+	assert.NotContains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app1\"")
 }
 
 func TestMatcher(t *testing.T) {
@@ -103,7 +126,7 @@ func TestMatcher(t *testing.T) {
 			// Build a match config which has a simple label stage that when matched will add the test_label to
 			// the labels in the pipeline.
 			matchConfig := MatcherConfig{
-				"pl_name",
+				nil,
 				tt.matcher,
 				PipelineStages{
 					PipelineStage{
@@ -113,7 +136,7 @@ func TestMatcher(t *testing.T) {
 					},
 				},
 			}
-			s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer)
+			s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
 				return
diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go
index d1208fb9..b5149844 100644
--- a/pkg/logentry/stages/metrics.go
+++ b/pkg/logentry/stages/metrics.go
@@ -21,6 +21,10 @@ import (
 const customPrefix = "promtail_custom_"
 
 const (
+	MetricTypeCounter   = "counter"
+	MetricTypeGauge     = "gauge"
+	MetricTypeHistogram = "histogram"
+
 	ErrEmptyMetricsStageConfig = "empty metric stage configuration"
 )
 
@@ -51,8 +55,8 @@ func validateMetricsConfig(cfg MetricsConfig) error {
 	return nil
 }
 
-// newMetric creates a new set of metrics to process for each log entry
-func newMetric(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) {
+// newMetricStage creates a new set of metrics to process for each log entry
+func newMetricStage(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) {
 	cfgs := &MetricsConfig{}
 	err := mapstructure.Decode(config, cfgs)
 	if err != nil {
@@ -95,12 +99,14 @@ func newMetric(logger log.Logger, config interface{}, registry prometheus.Regist
 	}, nil
 }
 
+// metricStage creates and updates prometheus metrics based on extracted pipeline data
 type metricStage struct {
 	logger  log.Logger
 	cfg     MetricsConfig
 	metrics map[string]prometheus.Collector
 }
 
+// Process implements Stage
 func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	for name, collector := range m.metrics {
 		if v, ok := extracted[*m.cfg[name].Source]; ok {
@@ -116,6 +122,7 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf
 	}
 }
 
+// recordCounter will update a counter metric
 func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
 	// If value matching is defined, make sure value matches.
 	if counter.Cfg.Value != nil {
@@ -144,6 +151,7 @@ func (m *metricStage) recordCounter(name string, counter *metric.Counters, label
 	}
 }
 
+// recordGauge will update a gauge metric
 func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels model.LabelSet, v interface{}) {
 	// If value matching is defined, make sure value matches.
 	if gauge.Cfg.Value != nil {
@@ -188,6 +196,7 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode
 	}
 }
 
+// recordHistogram will update a Histogram metric
 func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms, labels model.LabelSet, v interface{}) {
 	// If value matching is defined, make sure value matches.
 	if histogram.Cfg.Value != nil {
@@ -210,6 +219,7 @@ func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms,
 	histogram.With(labels).Observe(f)
 }
 
+// getFloat will take the provided value and return a float64 if possible
 func getFloat(unk interface{}) (float64, error) {
 
 	switch i := unk.(type) {
diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go
index cc47b3d9..9d7c4b37 100644
--- a/pkg/logentry/stages/metrics_test.go
+++ b/pkg/logentry/stages/metrics_test.go
@@ -44,7 +44,7 @@ promtail_custom_loki_count 1.0
 
 func TestMetricsPipeline(t *testing.T) {
 	registry := prometheus.NewRegistry()
-	pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), "test", registry)
+	pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), nil, registry)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -150,15 +150,15 @@ func TestMetricStage_Process(t *testing.T) {
 	}
 
 	registry := prometheus.NewRegistry()
-	jsonStage, err := New(util.Logger, "test", StageTypeJSON, jsonConfig, registry)
+	jsonStage, err := New(util.Logger, nil, StageTypeJSON, jsonConfig, registry)
 	if err != nil {
 		t.Fatalf("failed to create stage with metrics: %v", err)
 	}
-	regexStage, err := New(util.Logger, "test", StageTypeRegex, regexConfig, registry)
+	regexStage, err := New(util.Logger, nil, StageTypeRegex, regexConfig, registry)
 	if err != nil {
 		t.Fatalf("failed to create stage with metrics: %v", err)
 	}
-	metricStage, err := New(util.Logger, "test", StageTypeMetric, metricsConfig, registry)
+	metricStage, err := New(util.Logger, nil, StageTypeMetric, metricsConfig, registry)
 	if err != nil {
 		t.Fatalf("failed to create stage with metrics: %v", err)
 	}
diff --git a/pkg/logentry/stages/output.go b/pkg/logentry/stages/output.go
index ea49acd2..88392871 100644
--- a/pkg/logentry/stages/output.go
+++ b/pkg/logentry/stages/output.go
@@ -22,6 +22,7 @@ type OutputConfig struct {
 	Source string `mapstructure:"source"`
 }
 
+// validateOutput validates the outputStage config
 func validateOutputConfig(cfg *OutputConfig) error {
 	if cfg == nil {
 		return errors.New(ErrEmptyOutputStageConfig)
@@ -32,8 +33,8 @@ func validateOutputConfig(cfg *OutputConfig) error {
 	return nil
 }
 
-// newLabel creates a new set of metrics to process for each log entry
-func newOutput(logger log.Logger, config interface{}) (*outputStage, error) {
+// newOutputStage creates a new outputStage
+func newOutputStage(logger log.Logger, config interface{}) (*outputStage, error) {
 	cfg := &OutputConfig{}
 	err := mapstructure.Decode(config, cfg)
 	if err != nil {
@@ -49,11 +50,13 @@ func newOutput(logger log.Logger, config interface{}) (*outputStage, error) {
 	}, nil
 }
 
+// outputStage will mutate the incoming entry and set it from extracted data
 type outputStage struct {
 	cfgs   *OutputConfig
 	logger log.Logger
 }
 
+// Process implements Stage
 func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	if o.cfgs == nil {
 		return
diff --git a/pkg/logentry/stages/output_test.go b/pkg/logentry/stages/output_test.go
index 00c1f5dd..8429dc7e 100644
--- a/pkg/logentry/stages/output_test.go
+++ b/pkg/logentry/stages/output_test.go
@@ -32,7 +32,7 @@ var testOutputLogLine = `
 `
 
 func TestPipeline_Output(t *testing.T) {
-	pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), "test", prometheus.DefaultRegisterer)
+	pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), nil, prometheus.DefaultRegisterer)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -77,4 +77,35 @@ func TestOutputValidation(t *testing.T) {
 	}
 }
 
-//TODO test label processing
+func TestOutputStage_Process(t *testing.T) {
+	tests := map[string]struct {
+		config         OutputConfig
+		extracted      map[string]interface{}
+		expectedOutput string
+	}{
+		"sets output": {
+			OutputConfig{
+				Source: "out",
+			},
+			map[string]interface{}{
+				"something": "notimportant",
+				"out":       "outmessage",
+			},
+			"outmessage",
+		},
+	}
+	for name, test := range tests {
+		test := test
+		t.Run(name, func(t *testing.T) {
+			t.Parallel()
+			st, err := newOutputStage(util.Logger, test.config)
+			if err != nil {
+				t.Fatal(err)
+			}
+			lbls := model.LabelSet{}
+			entry := "replaceme"
+			st.Process(lbls, test.extracted, nil, &entry)
+			assert.Equal(t, test.expectedOutput, entry)
+		})
+	}
+}
diff --git a/pkg/logentry/stages/pipeline.go b/pkg/logentry/stages/pipeline.go
index 87114ab7..636fbb92 100644
--- a/pkg/logentry/stages/pipeline.go
+++ b/pkg/logentry/stages/pipeline.go
@@ -7,21 +7,11 @@ import (
 	"github.com/go-kit/kit/log/level"
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
-	"github.com/prometheus/client_golang/prometheus/promauto"
 	"github.com/prometheus/common/model"
 
 	"github.com/grafana/loki/pkg/promtail/api"
 )
 
-var (
-	pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
-		Namespace: "logentry",
-		Name:      "pipeline_duration_seconds",
-		Help:      "Label and metric extraction pipeline processing time, in seconds",
-		Buckets:   []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000},
-	}, []string{"job_name"})
-)
-
 // PipelineStages contains configuration for each stage within a pipeline
 type PipelineStages = []interface{}
 
@@ -30,13 +20,30 @@ type PipelineStage = map[interface{}]interface{}
 
 // Pipeline pass down a log entry to each stage for mutation and/or label extraction.
 type Pipeline struct {
-	logger  log.Logger
-	stages  []Stage
-	jobName string
+	logger     log.Logger
+	stages     []Stage
+	jobName    *string
+	plDuration *prometheus.HistogramVec
 }
 
 // NewPipeline creates a new log entry pipeline from a configuration
-func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registerer prometheus.Registerer) (*Pipeline, error) {
+func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
+	hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "logentry",
+		Name:      "pipeline_duration_seconds",
+		Help:      "Label and metric extraction pipeline processing time, in seconds",
+		Buckets:   []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000},
+	}, []string{"job_name"})
+	err := registerer.Register(hist)
+	if err != nil {
+		if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			hist = existing.ExistingCollector.(*prometheus.HistogramVec)
+		} else {
+			// Same behavior as MustRegister if the error is not for AlreadyRegistered
+			panic(err)
+		}
+	}
+
 	st := []Stage{}
 	for _, s := range stgs {
 		stage, ok := s.(PipelineStage)
@@ -60,9 +67,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registe
 		}
 	}
 	return &Pipeline{
-		logger:  log.With(logger, "component", "pipeline"),
-		stages:  st,
-		jobName: jobName,
+		logger:     log.With(logger, "component", "pipeline"),
+		stages:     st,
+		jobName:    jobName,
+		plDuration: hist,
 	}, nil
 }
 
@@ -75,7 +83,9 @@ func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface
 	}
 	dur := time.Since(start).Seconds()
 	level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur)
-	pipelineDuration.WithLabelValues(p.jobName).Observe(dur)
+	if p.jobName != nil {
+		p.plDuration.WithLabelValues(*p.jobName).Observe(dur)
+	}
 }
 
 // Wrap implements EntryMiddleware
diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go
index b323e628..7ecf8719 100644
--- a/pkg/logentry/stages/pipeline_test.go
+++ b/pkg/logentry/stages/pipeline_test.go
@@ -24,7 +24,6 @@ var (
 var testYaml = `
 pipeline_stages:
 - match:
-    pipeline_name: "test_pipeline"
     selector: "{match=\"true\"}"
     stages:
     - docker:
@@ -49,7 +48,7 @@ func loadConfig(yml string) PipelineStages {
 
 func TestNewPipeline(t *testing.T) {
 
-	p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", prometheus.DefaultRegisterer)
+	p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil, prometheus.DefaultRegisterer)
 	if err != nil {
 		panic(err)
 	}
@@ -67,7 +66,7 @@ func TestPipeline_MultiStage(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", prometheus.DefaultRegisterer)
+	p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer)
 	if err != nil {
 		panic(err)
 	}
@@ -155,7 +154,7 @@ func BenchmarkPipeline(b *testing.B) {
 	}
 	for _, bm := range benchmarks {
 		b.Run(bm.name, func(b *testing.B) {
-			pl, err := NewPipeline(bm.logger, bm.stgs, "test", prometheus.DefaultRegisterer)
+			pl, err := NewPipeline(bm.logger, bm.stgs, nil, prometheus.DefaultRegisterer)
 			if err != nil {
 				panic(err)
 			}
diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go
index c459194e..10d8cc2f 100644
--- a/pkg/logentry/stages/regex.go
+++ b/pkg/logentry/stages/regex.go
@@ -18,6 +18,7 @@ const (
 	ErrEmptyRegexStageConfig = "empty regex stage configuration"
 )
 
+// RegexConfig contains a regexStage configuration
 type RegexConfig struct {
 	Expression string `mapstructure:"expression"`
 }
@@ -40,14 +41,14 @@ func validateRegexConfig(c *RegexConfig) (*regexp.Regexp, error) {
 	return expr, nil
 }
 
-// regexStage mutates log entries using regex
+// regexStage sets extracted data using regular expressions
 type regexStage struct {
 	cfg        *RegexConfig
 	expression *regexp.Regexp
 	logger     log.Logger
 }
 
-// newRegexStage creates a new regular expression Mutator.
+// newRegexStage creates a newRegexStage
 func newRegexStage(logger log.Logger, config interface{}) (Stage, error) {
 	cfg, err := parseRegexConfig(config)
 	if err != nil {
@@ -64,6 +65,7 @@ func newRegexStage(logger log.Logger, config interface{}) (Stage, error) {
 	}, nil
 }
 
+// parseRegexConfig processes an incoming configuration into a RegexConfig
 func parseRegexConfig(config interface{}) (*RegexConfig, error) {
 	cfg := &RegexConfig{}
 	err := mapstructure.Decode(config, cfg)
diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go
index a33d2f9f..830bc015 100644
--- a/pkg/logentry/stages/regex_test.go
+++ b/pkg/logentry/stages/regex_test.go
@@ -129,7 +129,7 @@ func TestRegexParser_Parse(t *testing.T) {
 		tt := tt
 		t.Run(tName, func(t *testing.T) {
 			t.Parallel()
-			p, err := New(util.Logger, "test", StageTypeRegex, tt.config, nil)
+			p, err := New(util.Logger, nil, StageTypeRegex, tt.config, nil)
 			if err != nil {
 				t.Fatalf("failed to create regex parser: %s", err)
 			}
@@ -171,7 +171,7 @@ func BenchmarkRegexStage(b *testing.B) {
 	}
 	for _, bm := range benchmarks {
 		b.Run(bm.name, func(b *testing.B) {
-			stage, err := New(util.Logger, "test", StageTypeRegex, bm.config, nil)
+			stage, err := New(util.Logger, nil, StageTypeRegex, bm.config, nil)
 			if err != nil {
 				panic(err)
 			}
diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go
index 939da50b..f5f2d7f5 100644
--- a/pkg/logentry/stages/stage.go
+++ b/pkg/logentry/stages/stage.go
@@ -36,18 +36,18 @@ func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface
 }
 
 // New creates a new stage for the given type and configuration.
-func New(logger log.Logger, jobName string, stageType string,
+func New(logger log.Logger, jobName *string, stageType string,
 	cfg interface{}, registerer prometheus.Registerer) (Stage, error) {
 	var s Stage
 	var err error
 	switch stageType {
 	case StageTypeDocker:
-		s, err = NewDocker(logger, jobName, registerer)
+		s, err = NewDocker(logger, registerer)
 		if err != nil {
 			return nil, err
 		}
 	case StageTypeCRI:
-		s, err = NewCRI(logger, jobName, registerer)
+		s, err = NewCRI(logger, registerer)
 		if err != nil {
 			return nil, err
 		}
@@ -62,27 +62,27 @@ func New(logger log.Logger, jobName string, stageType string,
 			return nil, err
 		}
 	case StageTypeMetric:
-		s, err = newMetric(logger, cfg, registerer)
+		s, err = newMetricStage(logger, cfg, registerer)
 		if err != nil {
 			return nil, err
 		}
 	case StageTypeLabel:
-		s, err = newLabel(logger, cfg)
+		s, err = newLabelStage(logger, cfg)
 		if err != nil {
 			return nil, err
 		}
 	case StageTypeTimestamp:
-		s, err = newTimestamp(logger, cfg)
+		s, err = newTimestampStage(logger, cfg)
 		if err != nil {
 			return nil, err
 		}
 	case StageTypeOutput:
-		s, err = newOutput(logger, cfg)
+		s, err = newOutputStage(logger, cfg)
 		if err != nil {
 			return nil, err
 		}
 	case StageTypeMatch:
-		s, err = newMatcherStage(logger, cfg, registerer)
+		s, err = newMatcherStage(logger, jobName, cfg, registerer)
 		if err != nil {
 			return nil, err
 		}
diff --git a/pkg/logentry/stages/timestamp.go b/pkg/logentry/stages/timestamp.go
index 8ea2163b..035288b1 100644
--- a/pkg/logentry/stages/timestamp.go
+++ b/pkg/logentry/stages/timestamp.go
@@ -23,6 +23,7 @@ type TimestampConfig struct {
 	Format string `mapstructure:"format"`
 }
 
+// validateTimestampConfig validates a timestampStage configuration
 func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
 	if cfg == nil {
 		return "", errors.New(ErrEmptyTimestampStageConfig)
@@ -37,8 +38,8 @@ func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
 
 }
 
-// newTimestamp creates a new timestamp extraction pipeline stage.
-func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error) {
+// newTimestampStage creates a new timestamp extraction pipeline stage.
+func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage, error) {
 	cfg := &TimestampConfig{}
 	err := mapstructure.Decode(config, cfg)
 	if err != nil {
@@ -55,12 +56,14 @@ func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error
 	}, nil
 }
 
+// timestampStage will set the timestamp using extracted data
 type timestampStage struct {
 	cfgs   *TimestampConfig
 	logger log.Logger
 	format string
 }
 
+// Process implements Stage
 func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
 	if ts.cfgs == nil {
 		return
diff --git a/pkg/logentry/stages/timestamp_test.go b/pkg/logentry/stages/timestamp_test.go
index 97b35bd9..a9b32dd6 100644
--- a/pkg/logentry/stages/timestamp_test.go
+++ b/pkg/logentry/stages/timestamp_test.go
@@ -23,7 +23,7 @@ pipeline_stages:
 
 var testTimestampLogLine = `
 {
-	"time":"2012-11-01T22:08:41+00:00",
+	"time":"2012-11-01T22:08:41-04:00",
 	"app":"loki",
 	"component": ["parser","type"],
 	"level" : "WARN"
@@ -31,7 +31,7 @@ var testTimestampLogLine = `
 `
 
 func TestTimestampPipeline(t *testing.T) {
-	pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), "test", prometheus.DefaultRegisterer)
+	pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), nil, prometheus.DefaultRegisterer)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -40,7 +40,7 @@ func TestTimestampPipeline(t *testing.T) {
 	entry := testTimestampLogLine
 	extracted := map[string]interface{}{}
 	pl.Process(lbls, extracted, &ts, &entry)
-	assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", 0)), ts)
+	assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)), ts)
 }
 
 func TestTimestampValidation(t *testing.T) {
@@ -100,4 +100,37 @@ func TestTimestampValidation(t *testing.T) {
 	}
 }
 
-//TODO process tests
+func TestTimestampStage_Process(t *testing.T) {
+	tests := map[string]struct {
+		config    TimestampConfig
+		extracted map[string]interface{}
+		expected  time.Time
+	}{
+		"set success": {
+			TimestampConfig{
+				Source: "ts",
+				Format: time.RFC3339,
+			},
+			map[string]interface{}{
+				"somethigelse": "notimportant",
+				"ts":           "2106-01-02T23:04:05-04:00",
+			},
+			time.Date(2106, 01, 02, 23, 04, 05, 0, time.FixedZone("", -4*60*60)),
+		},
+	}
+	for name, test := range tests {
+		test := test
+		t.Run(name, func(t *testing.T) {
+			t.Parallel()
+			st, err := newTimestampStage(util.Logger, test.config)
+			if err != nil {
+				t.Fatal(err)
+			}
+			ts := time.Now()
+			lbls := model.LabelSet{}
+			st.Process(lbls, test.extracted, &ts, nil)
+			assert.Equal(t, test.expected, ts)
+
+		})
+	}
+}
diff --git a/pkg/logentry/stages/util.go b/pkg/logentry/stages/util.go
index 441a7ead..baa13a18 100644
--- a/pkg/logentry/stages/util.go
+++ b/pkg/logentry/stages/util.go
@@ -34,6 +34,7 @@ func convertDateLayout(predef string) string {
 	}
 }
 
+// getString will convert the input variable to a string if possible
 func getString(unk interface{}) (string, error) {
 
 	switch i := unk.(type) {
diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go
index 7d8e4171..3419a1eb 100644
--- a/pkg/promtail/targets/filetargetmanager.go
+++ b/pkg/promtail/targets/filetargetmanager.go
@@ -77,7 +77,7 @@ func NewFileTargetManager(
 	config := map[string]sd_config.ServiceDiscoveryConfig{}
 	for _, cfg := range scrapeConfigs {
 		registerer := prometheus.DefaultRegisterer
-		pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName, registerer)
+		pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, registerer)
 		if err != nil {
 			return nil, err
 		}
@@ -87,14 +87,14 @@ func NewFileTargetManager(
 			switch cfg.EntryParser {
 			case api.CRI:
 				level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
-				cri, err := stages.NewCRI(logger, cfg.JobName, registerer)
+				cri, err := stages.NewCRI(logger, registerer)
 				if err != nil {
 					return nil, err
 				}
 				pipeline.AddStage(cri)
 			case api.Docker:
 				level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
-				docker, err := stages.NewDocker(logger, cfg.JobName, registerer)
+				docker, err := stages.NewDocker(logger, registerer)
 				if err != nil {
 					return nil, err
 				}
-- 
GitLab