diff --git a/README.md b/README.md
index fc4c72a9d242ebbb9ec2ae1a755f44d75456c990..5703f7291a558f9e1069ac2767eeb0b13b4ea2e5 100644
--- a/README.md
+++ b/README.md
@@ -36,7 +36,7 @@ Once you have promtail, Loki, and Grafana running, continue with [our usage docs
 
 - [API documentation](./docs/api.md) for alternative ways of getting logs into Loki.
 - [Operations](./docs/operations.md) for important aspects of running Loki.
-- [Promtail](./docs/promtail-setup.md) on how to configure the agent that tails your logs.
+- [Promtail](./docs/promtail.md) is an agent which can tail your log files and push them to Loki.
 - [Logcli](./docs/logcli.md) on how to query your logs without Grafana.
 - [Troubleshooting](./docs/troubleshooting.md) for help around frequent error messages.
 - [Usage](./docs/usage.md) for how to set up a Loki datasource in Grafana and query your logs.
@@ -55,12 +55,16 @@ Your feedback is always welcome.
 ## Further Reading
 
 - The original [design doc](https://docs.google.com/document/d/11tjK_lvp1-SVsFZjgOTr1vV3-q6vBAsZYIQ5ZeYBkyM/view) for Loki is a good source for discussion of the motivation and design decisions.
+- Callum Styan's March 2019 DevOpsDays Vancouver talk "[Grafana Loki: Log Aggregation for Incident Investigations][devopsdays19-talk]".
+- Grafana Labs blog post "[How We Designed Loki to Work Easily Both as Microservices and as Monoliths][architecture-blog]".
 - Julien Garcia Gonzalez' March 2019 blog post "[Grafana Logging using Loki][giant-swarm-blog]".
 - Tom Wilkie's early-2019 CNCF Paris/FODEM talk "[Grafana Loki: like Prometheus, but for logs][fosdem19-talk]" ([slides][fosdem19-slides], [video][fosdem19-video]).
 - David Kaltschmidt's KubeCon 2018 talk "[On the OSS Path to Full Observability with Grafana][kccna18-event]" ([slides][kccna18-slides], [video][kccna18-video]) on how Loki fits into a cloud-native environment.
 - Goutham Veeramachaneni's blog post "[Loki: Prometheus-inspired, open source logging for cloud natives](https://grafana.com/blog/2018/12/12/loki-prometheus-inspired-open-source-logging-for-cloud-natives/)" on details of the Loki architectire.
 - David Kaltschmidt's blog post "[Closer look at Grafana's user interface for Loki](https://grafana.com/blog/2019/01/02/closer-look-at-grafanas-user-interface-for-loki/)" on the ideas that went into the logging user interface.
 
+[devopsdays19-talk]: https://grafana.com/blog/2019/05/06/how-loki-correlates-metrics-and-logs----and-saves-you-money/
+[architecture-blog]: https://grafana.com/blog/2019/04/15/how-we-designed-loki-to-work-easily-both-as-microservices-and-as-monoliths/
 [giant-swarm-blog]: https://blog.giantswarm.io/grafana-logging-using-loki
 [fosdem19-talk]: https://fosdem.org/2019/schedule/event/loki_prometheus_for_logs/
 [fosdem19-slides]: https://speakerdeck.com/grafana/grafana-loki-like-prometheus-but-for-logs
diff --git a/cmd/promtail/main.go b/cmd/promtail/main.go
index d5c34433c214da13d6e5d0fd357e1005a6ed0c40..ba9fb026d9e98dc8aab9a6fd05891d6b4660c3da 100644
--- a/cmd/promtail/main.go
+++ b/cmd/promtail/main.go
@@ -37,6 +37,9 @@ func main() {
 		}
 	}
 
+	// Re-init the logger which will now honor a different log level set in ServerConfig.Config
+	util.InitLogger(&config.ServerConfig.Config)
+
 	p, err := promtail.New(config)
 	if err != nil {
 		level.Error(util.Logger).Log("msg", "error creating promtail", "error", err)
diff --git a/docs/promtail-examples.md b/docs/promtail-examples.md
index dda607add244b426ebca49c2eaea99ed4a9d4e0d..47526922367bdab47835a156eb918d93fc42023f 100644
--- a/docs/promtail-examples.md
+++ b/docs/promtail-examples.md
@@ -1,7 +1,12 @@
-# promtail examples
-#### In this file you can see simple examples of configure promtail
+# Promtail Config Examples
 
-* This example of config promtail based on original docker [config](https://github.com/grafana/loki/blob/master/cmd/promtail/promtail-docker-config.yaml)
+## Pipeline Examples
+
+TODO Need pipeline examples
+
+## Simple Docker Config
+
+This example of config promtail based on original docker [config](https://github.com/grafana/loki/blob/master/cmd/promtail/promtail-docker-config.yaml)
 and show how work with 2 and more sources:
 
 Filename for example: my-docker-config.yaml
@@ -38,18 +43,19 @@ scrape_configs:
       __path__: /srv/log/someone_service/*.log
 
 ```
-##### Description
-Scrape_config section of config.yaml contents are various jobs for parsing your logs on current host
+#### Description
+
+Scrape_config section of config.yaml contents contains various jobs for parsing your logs
 
-`job` and `host` these are tags on which you can filter parsed logs date on Grafana later
+`job` and `host` are examples of static labels added to all logs, labels are indexed by Loki and are used to help search logs.
 
 `__path__` it is path to directory where stored your logs.
 
 If you run promtail and this config.yaml in Docker container, don't forget use docker volumes for mapping real directories
 with log to those folders in the container. 
 
-* See next example of Dockerfile, who use our modified promtail config (my-docker-config.yaml)
-1) Create folder, for example `promtail`, then new folder build and in this filder conf and place there `my-docker-config.yaml`.
+#### Example Use
+1) Create folder, for example `promtail`, then new sub directory `build/conf` and place there `my-docker-config.yaml`.
 2) Create new Dockerfile in root folder `promtail`, with contents
 ```
 FROM grafana/promtail:latest
diff --git a/docs/promtail-setup.md b/docs/promtail-setup.md
index 3312bd7c5a390a64e13ec5c8113905840bb40c3e..cd171c8cc9d1b765c0ac375832d0453f9b6b7e17 100644
--- a/docs/promtail-setup.md
+++ b/docs/promtail-setup.md
@@ -1,9 +1,5 @@
 # Promtail Setups
 
-## Design Documentation
-
-* [Extracting labels from logs](./design/labels.md)
-
 ## Daemonset method
 
 Daemonset will deploy promtail on every node within the kubernetes cluster.
@@ -12,6 +8,8 @@ Daemonset deployment is great to collect all of the container logs within the
 cluster. It is great solution for single tenant.  All of the logs will send to a
 single Loki server.
 
+Check the `production` folder for examples of a daemonset deployment for kubernetes using both helm and ksonnet.
+
 ### Example
 ```yaml
 ---Daemonset.yaml
diff --git a/docs/promtail.md b/docs/promtail.md
index 783dd61d87c317ae8aae418cab581c09c99f6c6a..ff8b3bc9cc5c89e6f4dfb0b02ae1d5e2453edaeb 100644
--- a/docs/promtail.md
+++ b/docs/promtail.md
@@ -1,3 +1,14 @@
+# Promtail
+
+* [Deployment Methods](./promtail-setup.md)
+* [Config and Usage Examples](./promtail-examples.md)
+* [Troubleshooting](./troubleshooting.md)
+
+
+## Design Documentation
+   
+   * [Extracting labels from logs](./design/labels.md)
+
 ## Promtail and scrape_configs
 
 Promtail is an agent which reads log files and sends streams of log data to
diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md
index 5d09932f1f1b6f63e98b4ba740d0394aa2f32319..59046ed9477b947c2cf874fdc8de7d81521e6903 100644
--- a/docs/troubleshooting.md
+++ b/docs/troubleshooting.md
@@ -46,7 +46,7 @@ The promtail configuration contains a `__path__` entry to a directory that promt
 
 ## Connecting to a promtail pod to troubleshoot
 
-Say you are missing logs from your nginx pod and want to investigate promtail.
+First check *Troubleshooting targets* section above, if that doesn't help answer your questions you can connect to the promtail pod to further investigate.
 
 In your cluster if you are running promtail as a daemonset, you will have a promtail pod on each node, to figure out which promtail you want run:
 
diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go
index 7bdeae54933df725d705f838b4e1d8782c7258f2..d056d25d042a4f42c972f41b57d98d2c3ea381bc 100644
--- a/pkg/ingester/flush.go
+++ b/pkg/ingester/flush.go
@@ -196,17 +196,20 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
 	var result []*chunkDesc
 	for j := range stream.chunks {
 		if immediate || i.shouldFlushChunk(&stream.chunks[j]) {
-			result = append(result, &stream.chunks[j])
+			// Ensure no more writes happen to this chunk.
+			if !stream.chunks[j].closed {
+				stream.chunks[j].closed = true
+			}
+			// Flush this chunk if it hasn't already been successfully flushed.
+			if stream.chunks[j].flushed.IsZero() {
+				result = append(result, &stream.chunks[j])
+			}
 		}
 	}
 	return result, stream.labels
 }
 
 func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
-	if !chunk.flushed.IsZero() {
-		return false
-	}
-
 	// Append should close the chunk when the a new one is added.
 	if chunk.closed {
 		return true
diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go
new file mode 100644
index 0000000000000000000000000000000000000000..5c0c19312aa130529323e2960539aa91f9dce2f5
--- /dev/null
+++ b/pkg/logentry/pipeline.go
@@ -0,0 +1,108 @@
+package logentry
+
+import (
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
+	"github.com/pkg/errors"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/common/model"
+
+	"github.com/grafana/loki/pkg/logentry/stages"
+	"github.com/grafana/loki/pkg/promtail/api"
+)
+
+// PipelineStages contains configuration for each stage within a pipeline
+type PipelineStages []interface{}
+
+// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
+type Pipeline struct {
+	logger log.Logger
+	stages []stages.Stage
+	plObs  *prometheus.Observer
+}
+
+// NewPipeline creates a new log entry pipeline from a configuration
+func NewPipeline(logger log.Logger, stgs PipelineStages, plObserverMicroSeconds *prometheus.Observer) (*Pipeline, error) {
+	st := []stages.Stage{}
+	for _, s := range stgs {
+		stage, ok := s.(map[interface{}]interface{})
+		if !ok {
+			return nil, errors.Errorf("invalid YAML config, "+
+				"make sure each stage of your pipeline is a YAML object (must end with a `:`), check stage `- %s`", s)
+		}
+		if len(stage) > 1 {
+			return nil, errors.New("pipeline stage must contain only one key")
+		}
+		for key, config := range stage {
+			name, ok := key.(string)
+			if !ok {
+				return nil, errors.New("pipeline stage key must be a string")
+			}
+			switch name {
+			case "json":
+				json, err := stages.NewJSON(logger, config)
+				if err != nil {
+					return nil, errors.Wrap(err, "invalid json stage config")
+				}
+				st = append(st, json)
+			case "regex":
+				regex, err := stages.NewRegex(logger, config)
+				if err != nil {
+					return nil, errors.Wrap(err, "invalid regex stage config")
+				}
+				st = append(st, regex)
+			case "docker":
+				docker, err := stages.NewDocker(logger)
+				if err != nil {
+					return nil, errors.Wrap(err, "invalid docker stage config")
+				}
+				st = append(st, docker)
+			case "cri":
+				cri, err := stages.NewCRI(logger)
+				if err != nil {
+					return nil, errors.Wrap(err, "invalid cri stage config")
+				}
+				st = append(st, cri)
+			}
+		}
+	}
+	return &Pipeline{
+		logger: log.With(logger, "component", "pipeline"),
+		stages: st,
+		plObs:  plObserverMicroSeconds,
+	}, nil
+}
+
+// Process mutates an entry and its metadata by using multiple configure stage.
+func (p *Pipeline) Process(labels model.LabelSet, ts *time.Time, entry *string) {
+	start := time.Now()
+	for i, stage := range p.stages {
+		level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", ts, "entry", entry)
+		stage.Process(labels, ts, entry)
+	}
+	durUs := float64(time.Since(start).Nanoseconds()) / 1000
+	level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_us", durUs)
+	if p.plObs != nil {
+		(*p.plObs).Observe(durUs)
+	}
+}
+
+// Wrap implements EntryMiddleware
+func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
+	return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error {
+		p.Process(labels, &timestamp, &line)
+		return next.Handle(labels, timestamp, line)
+	})
+}
+
+// AddStage adds a stage to the pipeline
+func (p *Pipeline) AddStage(stage stages.Stage) {
+	p.stages = append(p.stages, stage)
+}
+
+// Size gets the current number of stages in the pipeline
+func (p *Pipeline) Size() int {
+	return len(p.stages)
+}
diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..322db9b2c268a9cadbb54d1ab889d56eb554e962
--- /dev/null
+++ b/pkg/logentry/pipeline_test.go
@@ -0,0 +1,153 @@
+package logentry
+
+import (
+	"testing"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
+	"github.com/prometheus/common/model"
+	"github.com/stretchr/testify/assert"
+
+	"gopkg.in/yaml.v2"
+)
+
+var rawTestLine = `{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
+var processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
+
+var testYaml = `
+pipeline_stages:
+- docker:
+- regex:
+    expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
+    timestamp:
+      source: timestamp
+      format: "02/Jan/2006:15:04:05 -0700"
+    labels:
+      action:
+        source: "action"
+      status_code:
+        source: "status"
+`
+
+func loadConfig(yml string) PipelineStages {
+	var config map[string]interface{}
+	err := yaml.Unmarshal([]byte(yml), &config)
+	if err != nil {
+		panic(err)
+	}
+	return config["pipeline_stages"].([]interface{})
+}
+
+func TestNewPipeline(t *testing.T) {
+
+	p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil)
+	if err != nil {
+		panic(err)
+	}
+	if len(p.stages) != 2 {
+		t.Fatal("missing stages")
+	}
+}
+
+func TestPipeline_MultiStage(t *testing.T) {
+	est, err := time.LoadLocation("America/New_York")
+	if err != nil {
+		t.Fatal("could not parse timestamp", err)
+	}
+
+	var config map[string]interface{}
+	err = yaml.Unmarshal([]byte(testYaml), &config)
+	if err != nil {
+		panic(err)
+	}
+	p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil)
+	if err != nil {
+		panic(err)
+	}
+
+	tests := map[string]struct {
+		entry          string
+		expectedEntry  string
+		t              time.Time
+		expectedT      time.Time
+		labels         model.LabelSet
+		expectedLabels model.LabelSet
+	}{
+		"happy path": {
+			rawTestLine,
+			processedTestLine,
+			time.Now(),
+			time.Date(2000, 01, 25, 14, 00, 01, 0, est),
+			map[model.LabelName]model.LabelValue{
+				"test": "test",
+			},
+			map[model.LabelName]model.LabelValue{
+				"test":        "test",
+				"stream":      "stderr",
+				"action":      "GET",
+				"status_code": "200",
+			},
+		},
+	}
+
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			t.Parallel()
+
+			p.Process(tt.labels, &tt.t, &tt.entry)
+
+			assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels")
+			assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
+			if tt.t.Unix() != tt.expectedT.Unix() {
+				t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
+			}
+		})
+	}
+}
+
+var (
+	l = log.NewNopLogger()
+	//w = log.NewSyncWriter(os.Stdout)
+	//l = log.NewLogfmtLogger(w)
+	infoLogger  = level.NewFilter(l, level.AllowInfo())
+	debugLogger = level.NewFilter(l, level.AllowDebug())
+)
+
+func Benchmark(b *testing.B) {
+	benchmarks := []struct {
+		name   string
+		stgs   PipelineStages
+		logger log.Logger
+		entry  string
+	}{
+		{
+			"two stage info level",
+			loadConfig(testYaml),
+			infoLogger,
+			rawTestLine,
+		},
+		{
+			"two stage debug level",
+			loadConfig(testYaml),
+			debugLogger,
+			rawTestLine,
+		},
+	}
+	for _, bm := range benchmarks {
+		b.Run(bm.name, func(b *testing.B) {
+			pl, err := NewPipeline(bm.logger, bm.stgs, nil)
+			if err != nil {
+				panic(err)
+			}
+			lb := model.LabelSet{}
+			ts := time.Now()
+			for i := 0; i < b.N; i++ {
+				entry := bm.entry
+				pl.Process(lb, &ts, &entry)
+			}
+		})
+	}
+}
diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go
new file mode 100644
index 0000000000000000000000000000000000000000..a99c2be56890152c241c401997c7a52649aa4c5b
--- /dev/null
+++ b/pkg/logentry/stages/extensions.go
@@ -0,0 +1,44 @@
+package stages
+
+import (
+	"github.com/go-kit/kit/log"
+)
+
+// NewDocker creates a Docker json log format specific pipeline stage.
+func NewDocker(logger log.Logger) (Stage, error) {
+	config := map[string]interface{}{
+		"timestamp": map[string]interface{}{
+			"source": "time",
+			"format": "RFC3339",
+		},
+		"labels": map[string]interface{}{
+			"stream": map[string]interface{}{
+				"source": "stream",
+			},
+		},
+		"output": map[string]interface{}{
+			"source": "log",
+		},
+	}
+	return NewJSON(logger, config)
+}
+
+// NewCRI creates a CRI format specific pipeline stage
+func NewCRI(logger log.Logger) (Stage, error) {
+	config := map[string]interface{}{
+		"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
+		"timestamp": map[string]interface{}{
+			"source": "time",
+			"format": "RFC3339Nano",
+		},
+		"labels": map[string]interface{}{
+			"stream": map[string]interface{}{
+				"source": "stream",
+			},
+		},
+		"output": map[string]interface{}{
+			"source": "content",
+		},
+	}
+	return NewRegex(logger, config)
+}
diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..da61e72016d2b791db66cec85f903fd2e4315e42
--- /dev/null
+++ b/pkg/logentry/stages/extensions_test.go
@@ -0,0 +1,157 @@
+package stages
+
+import (
+	"testing"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/stretchr/testify/assert"
+)
+
+var (
+	dockerRaw       = `{"log":"level=info ts=2019-04-30T02:12:41.844179Z caller=filetargetmanager.go:180 msg=\"Adding target\" key=\"{com_docker_deploy_namespace=\\\"docker\\\", com_docker_fry=\\\"compose.api\\\", com_docker_image_tag=\\\"v0.4.12\\\", container_name=\\\"compose\\\", instance=\\\"compose-api-cbff6dfc9-cqfr8\\\", job=\\\"docker/compose-api\\\", namespace=\\\"docker\\\", pod_template_hash=\\\"769928975\\\"}\"\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
+	dockerProcessed = `level=info ts=2019-04-30T02:12:41.844179Z caller=filetargetmanager.go:180 msg="Adding target" key="{com_docker_deploy_namespace=\"docker\", com_docker_fry=\"compose.api\", com_docker_image_tag=\"v0.4.12\", container_name=\"compose\", instance=\"compose-api-cbff6dfc9-cqfr8\", job=\"docker/compose-api\", namespace=\"docker\", pod_template_hash=\"769928975\"}"
+`
+	dockerInvalidTimestampRaw = `{"log":"log message\n","stream":"stderr","time":"hi!"}`
+	dockerTestTimeNow         = time.Now()
+)
+
+func TestNewDocker(t *testing.T) {
+	loc, err := time.LoadLocation("UTC")
+	if err != nil {
+		t.Fatal("could not parse timezone", err)
+	}
+
+	tests := map[string]struct {
+		entry          string
+		expectedEntry  string
+		t              time.Time
+		expectedT      time.Time
+		labels         map[string]string
+		expectedLabels map[string]string
+	}{
+		"happy path": {
+			dockerRaw,
+			dockerProcessed,
+			time.Now(),
+			time.Date(2019, 4, 30, 02, 12, 41, 844351500, loc),
+			map[string]string{},
+			map[string]string{
+				"stream": "stderr",
+			},
+		},
+		"invalid timestamp": {
+			dockerInvalidTimestampRaw,
+			"log message\n",
+			dockerTestTimeNow,
+			dockerTestTimeNow,
+			map[string]string{},
+			map[string]string{
+				"stream": "stderr",
+			},
+		},
+		"invalid json": {
+			"i'm not json!",
+			"i'm not json!",
+			dockerTestTimeNow,
+			dockerTestTimeNow,
+			map[string]string{},
+			map[string]string{},
+		},
+	}
+
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			t.Parallel()
+			p, err := NewDocker(util.Logger)
+			if err != nil {
+				t.Fatalf("failed to create Docker parser: %s", err)
+			}
+			lbs := toLabelSet(tt.labels)
+			p.Process(lbs, &tt.t, &tt.entry)
+
+			assertLabels(t, tt.expectedLabels, lbs)
+			assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
+			if tt.t.Unix() != tt.expectedT.Unix() {
+				t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
+			}
+		})
+	}
+}
+
+var (
+	criTestTimeStr = "2019-01-01T01:00:00.000000001Z"
+	criTestTime, _ = time.Parse(time.RFC3339Nano, criTestTimeStr)
+	criTestTime2   = time.Now()
+)
+
+func TestNewCri(t *testing.T) {
+	tests := map[string]struct {
+		entry          string
+		expectedEntry  string
+		t              time.Time
+		expectedT      time.Time
+		labels         map[string]string
+		expectedLabels map[string]string
+	}{
+		"happy path": {
+			criTestTimeStr + " stderr P message",
+			"message",
+			time.Now(),
+			criTestTime,
+			map[string]string{},
+			map[string]string{
+				"stream": "stderr",
+			},
+		},
+		"multi line pass": {
+			criTestTimeStr + " stderr P message\nmessage2",
+			"message\nmessage2",
+			time.Now(),
+			criTestTime,
+			map[string]string{},
+			map[string]string{
+				"stream": "stderr",
+			},
+		},
+		"invalid timestamp": {
+			"3242 stderr P message",
+			"message",
+			criTestTime2,
+			criTestTime2,
+			map[string]string{},
+			map[string]string{
+				"stream": "stderr",
+			},
+		},
+		"invalid line": {
+			"i'm invalid!!!",
+			"i'm invalid!!!",
+			criTestTime2,
+			criTestTime2,
+			map[string]string{},
+			map[string]string{},
+		},
+	}
+
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			t.Parallel()
+			p, err := NewCRI(util.Logger)
+			if err != nil {
+				t.Fatalf("failed to create CRI parser: %s", err)
+			}
+			lbs := toLabelSet(tt.labels)
+			p.Process(lbs, &tt.t, &tt.entry)
+
+			assertLabels(t, tt.expectedLabels, lbs)
+			assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
+			if tt.t.Unix() != tt.expectedT.Unix() {
+				t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
+			}
+		})
+	}
+
+}
diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go
new file mode 100644
index 0000000000000000000000000000000000000000..17e325e21ab81ef72ba28b6f5d77830ef5253dbb
--- /dev/null
+++ b/pkg/logentry/stages/json.go
@@ -0,0 +1,210 @@
+package stages
+
+import (
+	"encoding/json"
+	"fmt"
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
+	"github.com/jmespath/go-jmespath"
+	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/model"
+)
+
+// JSONTimestamp configures timestamp extraction
+type JSONTimestamp struct {
+	Source *string `mapstructure:"source"`
+	Format string  `mapstructure:"format"`
+}
+
+// JSONLabel configures a labels value extraction
+type JSONLabel struct {
+	Source *string `mapstructure:"source"`
+}
+
+// JSONOutput configures output value extraction
+type JSONOutput struct {
+	Source *string `mapstructure:"source"`
+}
+
+// JSONConfig configures the log entry parser to extract value from json
+type JSONConfig struct {
+	Timestamp *JSONTimestamp        `mapstructure:"timestamp"`
+	Output    *JSONOutput           `mapstructure:"output"`
+	Labels    map[string]*JSONLabel `mapstructure:"labels"`
+}
+
+func newJSONConfig(config interface{}) (*JSONConfig, error) {
+	cfg := &JSONConfig{}
+	err := mapstructure.Decode(config, cfg)
+	if err != nil {
+		return nil, err
+	}
+	return cfg, nil
+}
+
+// validate the config and returns a map of necessary jmespath expressions.
+func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) {
+	if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil {
+		return nil, errors.New("empty json parser configuration")
+	}
+
+	if c.Output != nil && (c.Output.Source == nil || (c.Output.Source != nil && *c.Output.Source == "")) {
+		return nil, errors.New("output source value is required")
+	}
+
+	if c.Timestamp != nil {
+		if c.Timestamp.Source != nil && *c.Timestamp.Source == "" {
+			return nil, errors.New("timestamp source value is required")
+		}
+		if c.Timestamp.Format == "" {
+			return nil, errors.New("timestamp format is required")
+		}
+		c.Timestamp.Format = convertDateLayout(c.Timestamp.Format)
+	}
+
+	expressions := map[string]*jmespath.JMESPath{}
+	var err error
+	if c.Timestamp != nil && c.Timestamp.Source != nil {
+		expressions[*c.Timestamp.Source], err = jmespath.Compile(*c.Timestamp.Source)
+		if err != nil {
+			return nil, errors.Wrap(err, "could not compile timestamp source jmespath expression")
+		}
+	}
+	if c.Output != nil && c.Output.Source != nil {
+		expressions[*c.Output.Source], err = jmespath.Compile(*c.Output.Source)
+		if err != nil {
+			return nil, errors.Wrap(err, "could not compile output source jmespath expression")
+		}
+	}
+	for labelName, labelSrc := range c.Labels {
+		if !model.LabelName(labelName).IsValid() {
+			return nil, fmt.Errorf("invalid label name: %s", labelName)
+		}
+		if labelSrc != nil && labelSrc.Source != nil {
+			expressions[*labelSrc.Source], err = jmespath.Compile(*labelSrc.Source)
+			if err != nil {
+				return nil, errors.Wrapf(err, "could not compile label source jmespath expression: %s", labelName)
+			}
+		}
+	}
+	return expressions, nil
+}
+
+type jsonStage struct {
+	cfg         *JSONConfig
+	expressions map[string]*jmespath.JMESPath
+	logger      log.Logger
+}
+
+// NewJSON creates a new json stage from a config.
+func NewJSON(logger log.Logger, config interface{}) (Stage, error) {
+	cfg, err := newJSONConfig(config)
+	if err != nil {
+		return nil, err
+	}
+	expressions, err := cfg.validate()
+	if err != nil {
+		return nil, err
+	}
+	return &jsonStage{
+		cfg:         cfg,
+		expressions: expressions,
+		logger:      log.With(logger, "component", "parser", "type", "json"),
+	}, nil
+}
+
+func (j *jsonStage) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) {
+	if expr == nil {
+		result, ok = data[fallback].(string)
+		if !ok {
+			level.Debug(j.logger).Log("msg", "field is not a string", "field", fallback)
+		}
+	} else {
+		var searchResult interface{}
+		searchResult, ok = j.getJSONValue(expr, data)
+		if !ok {
+			level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr)
+			return
+		}
+		result, ok = searchResult.(string)
+		if !ok {
+			level.Debug(j.logger).Log("msg", "search result is not a string", "expr", *expr)
+		}
+	}
+	return
+}
+
+func (j *jsonStage) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) {
+	var err error
+	ok = true
+	result, err = j.expressions[*expr].Search(data)
+	if err != nil {
+		level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr)
+		ok = false
+		return
+	}
+	return
+}
+
+// Process implement a pipeline stage
+func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
+	if entry == nil {
+		level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
+		return
+	}
+	var data map[string]interface{}
+	if err := json.Unmarshal([]byte(*entry), &data); err != nil {
+		level.Debug(j.logger).Log("msg", "could not unmarshal json", "err", err)
+		return
+	}
+
+	// parsing ts
+	if j.cfg.Timestamp != nil {
+		if ts, ok := j.getJSONString(j.cfg.Timestamp.Source, "timestamp", data); ok {
+			parsedTs, err := time.Parse(j.cfg.Timestamp.Format, ts)
+			if err != nil {
+				level.Debug(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts)
+			} else {
+				*t = parsedTs
+			}
+		}
+	}
+
+	// parsing labels
+	for lName, lSrc := range j.cfg.Labels {
+		var src *string
+		if lSrc != nil {
+			src = lSrc.Source
+		}
+		lValue, ok := j.getJSONString(src, lName, data)
+		if !ok {
+			continue
+		}
+		labelValue := model.LabelValue(lValue)
+		// seems impossible as the json.Unmarshal would yield an error first.
+		if !labelValue.IsValid() {
+			level.Debug(j.logger).Log("msg", "invalid label value parsed", "value", labelValue)
+			continue
+		}
+		labels[model.LabelName(lName)] = labelValue
+	}
+
+	// parsing output
+	if j.cfg.Output != nil {
+		if jsonObj, ok := j.getJSONValue(j.cfg.Output.Source, data); ok && jsonObj != nil {
+			if s, ok := jsonObj.(string); ok {
+				*entry = s
+				return
+			}
+			b, err := json.Marshal(jsonObj)
+			if err != nil {
+				level.Debug(j.logger).Log("msg", "could not marshal output value", "err", err)
+				return
+			}
+			*entry = string(b)
+		}
+	}
+}
diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..1d797dbc1e340327dc89204d0a350a2dd54b658b
--- /dev/null
+++ b/pkg/logentry/stages/json_test.go
@@ -0,0 +1,333 @@
+package stages
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/stretchr/testify/assert"
+	"gopkg.in/yaml.v2"
+)
+
+var cfg = `json: 
+  timestamp:
+    source: time
+    format: RFC3339
+  labels:
+    stream:
+      source: json_key_name.json_sub_key_name
+  output:
+    source: log`
+
+// nolint
+func TestYamlMapStructure(t *testing.T) {
+	t.Parallel()
+
+	// testing that we can use yaml data into mapstructure.
+	var mapstruct map[interface{}]interface{}
+	if err := yaml.Unmarshal([]byte(cfg), &mapstruct); err != nil {
+		t.Fatalf("error while un-marshalling config: %s", err)
+	}
+	p, ok := mapstruct["json"].(map[interface{}]interface{})
+	if !ok {
+		t.Fatalf("could not read parser %+v", mapstruct["json"])
+	}
+	got, err := newJSONConfig(p)
+	if err != nil {
+		t.Fatalf("could not create parser from yaml: %s", err)
+	}
+	want := &JSONConfig{
+		Labels: map[string]*JSONLabel{
+			"stream": {
+				Source: String("json_key_name.json_sub_key_name"),
+			},
+		},
+		Output: &JSONOutput{Source: String("log")},
+		Timestamp: &JSONTimestamp{
+			Format: "RFC3339",
+			Source: String("time"),
+		},
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Fatalf("want: %+v got: %+v", want, got)
+	}
+}
+
+func String(s string) *string {
+	return &s
+}
+
+func TestJSONConfig_validate(t *testing.T) {
+	t.Parallel()
+	tests := map[string]struct {
+		config        interface{}
+		wantExprCount int
+		wantErr       bool
+	}{
+		"empty": {
+			map[string]interface{}{},
+			0,
+			true,
+		},
+		"missing output info": {
+			map[string]interface{}{
+				"output": map[string]interface{}{},
+			},
+			0,
+			true,
+		},
+		"missing output source": {
+			map[string]interface{}{
+				"output": map[string]interface{}{
+					"source": "",
+				},
+			},
+			0,
+			true,
+		},
+		"invalid output source": {
+			map[string]interface{}{
+				"output": map[string]interface{}{
+					"source": "[",
+				},
+			},
+			0,
+			true,
+		},
+		"missing timestamp source": {
+			map[string]interface{}{
+				"timestamp": map[string]interface{}{
+					"source": "",
+					"format": "ANSIC",
+				},
+			},
+			0,
+			true,
+		},
+		"invalid timestamp source": {
+			map[string]interface{}{
+				"timestamp": map[string]interface{}{
+					"source": "[",
+					"format": "ANSIC",
+				},
+			},
+			0,
+			true,
+		},
+		"missing timestamp format": {
+			map[string]interface{}{
+				"timestamp": map[string]interface{}{
+					"source": "test",
+					"format": "",
+				},
+			},
+			0,
+			true,
+		},
+		"invalid label name": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"": map[string]interface{}{},
+				},
+			},
+			0,
+			true,
+		},
+		"invalid label source": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "]",
+					},
+				},
+			},
+			0,
+			true,
+		},
+		"valid": {
+			map[string]interface{}{
+				"output": map[string]interface{}{
+					"source": "log.msg[0]",
+				},
+				"timestamp": map[string]interface{}{
+					"source": "log.ts",
+					"format": "RFC3339",
+				},
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "test",
+					},
+					"app": map[string]interface{}{
+						"source": "component.app",
+					},
+					"level": nil,
+				},
+			},
+			4,
+			false,
+		},
+	}
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			c, err := newJSONConfig(tt.config)
+			if err != nil {
+				t.Fatalf("failed to create config: %s", err)
+			}
+			got, err := c.validate()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("JSONConfig.validate() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if len(got) != tt.wantExprCount {
+				t.Errorf("expressions count = %v, want %v", len(got), tt.wantExprCount)
+			}
+		})
+	}
+}
+
+var logFixture = `
+{
+	"time":"2012-11-01T22:08:41+00:00",
+	"app":"loki",
+	"component": ["parser","type"],
+	"level" : "WARN",
+	"nested" : {"child":"value"},
+	"message" : "this is a log line",
+	"complex" : {
+		"log" : {"array":[{"test1":"test2"},{"test3":"test4"}],"prop":"value","prop2":"val2"}
+	}
+}
+`
+
+func TestJSONParser_Parse(t *testing.T) {
+	t.Parallel()
+
+	tests := map[string]struct {
+		config         interface{}
+		entry          string
+		expectedEntry  string
+		t              time.Time
+		expectedT      time.Time
+		labels         map[string]string
+		expectedLabels map[string]string
+	}{
+		"replace all": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"level": nil,
+					"component": map[string]interface{}{
+						"source": "component[0]",
+					},
+					"module": map[string]interface{}{
+						"source": "app",
+					},
+				},
+				"output": map[string]interface{}{
+					"source": "complex.log",
+				},
+				"timestamp": map[string]interface{}{
+					"source": "time",
+					"format": "RFC3339",
+				},
+			},
+			logFixture,
+			`{"array":[{"test1":"test2"},{"test3":"test4"}],"prop":"value","prop2":"val2"}`,
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2012-11-01T22:08:41+00:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout", "level": "WARN", "component": "parser", "module": "loki"},
+		},
+		"invalid json": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"level": nil,
+				},
+			},
+			"ts=now log=notjson",
+			"ts=now log=notjson",
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout"},
+		},
+		"invalid timestamp skipped": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"level": nil,
+				},
+				"timestamp": map[string]interface{}{
+					"source": "time",
+					"format": "invalid",
+				},
+			},
+			logFixture,
+			logFixture,
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout", "level": "WARN"},
+		},
+		"invalid labels skipped": {
+			map[string]interface{}{
+				"labels": map[string]interface{}{
+					"notexisting": nil,
+					"level": map[string]interface{}{
+						"source": "doesnotexist",
+					},
+				},
+			},
+			logFixture,
+			logFixture,
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout"},
+		},
+		"invalid output skipped": {
+			map[string]interface{}{
+				"output": map[string]interface{}{
+					"source": "doesnotexist",
+				},
+			},
+			logFixture,
+			logFixture,
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout"},
+		},
+		"string output": {
+			map[string]interface{}{
+				"output": map[string]interface{}{
+					"source": "message",
+				},
+			},
+			logFixture,
+			"this is a log line",
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"),
+			map[string]string{"stream": "stdout"},
+			map[string]string{"stream": "stdout"},
+		},
+	}
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			t.Parallel()
+			p, err := NewJSON(util.Logger, tt.config)
+			if err != nil {
+				t.Fatalf("failed to create json parser: %s", err)
+			}
+			lbs := toLabelSet(tt.labels)
+			p.Process(lbs, &tt.t, &tt.entry)
+
+			assertLabels(t, tt.expectedLabels, lbs)
+			assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
+			if tt.t.Unix() != tt.expectedT.Unix() {
+				t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
+			}
+		})
+	}
+}
diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go
new file mode 100644
index 0000000000000000000000000000000000000000..e44ac163f6a8c9e1ab4dadeb7c6ef68cb7997243
--- /dev/null
+++ b/pkg/logentry/stages/regex.go
@@ -0,0 +1,193 @@
+package stages
+
+import (
+	"fmt"
+	"regexp"
+	"time"
+
+	"github.com/go-kit/kit/log"
+	"github.com/go-kit/kit/log/level"
+	"github.com/mitchellh/mapstructure"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/model"
+)
+
+// RegexTimestamp configures timestamp extraction
+type RegexTimestamp struct {
+	Source *string `mapstructure:"source"`
+	Format string  `mapstructure:"format"`
+}
+
+// RegexLabel configures a labels value extraction
+type RegexLabel struct {
+	Source *string `mapstructure:"source"`
+}
+
+// RegexOutput configures output value extraction
+type RegexOutput struct {
+	Source *string `mapstructure:"source"`
+}
+
+// RegexConfig configures the log entry parser to extract value from regex
+type RegexConfig struct {
+	Timestamp  *RegexTimestamp        `mapstructure:"timestamp"`
+	Expression string                 `mapstructure:"expression"`
+	Output     *RegexOutput           `mapstructure:"output"`
+	Labels     map[string]*RegexLabel `mapstructure:"labels"`
+}
+
+func newRegexConfig(config interface{}) (*RegexConfig, error) {
+	cfg := &RegexConfig{}
+	err := mapstructure.Decode(config, cfg)
+	if err != nil {
+		return nil, err
+	}
+	return cfg, nil
+}
+
+// Config Errors
+const (
+	ErrExpressionRequired      = "expression is required"
+	ErrCouldNotCompileRegex    = "could not compile regular expression"
+	ErrEmptyRegexStageConfig   = "empty regex parser configuration"
+	ErrOutputSourceRequired    = "output source value is required if output is specified"
+	ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
+	ErrTimestampGroupRequired  = "regex must contain a named group to match the timestamp with name: %s"
+	ErrTimestampFormatRequired = "timestamp format is required"
+	ErrInvalidLabelName        = "invalid label name: %s"
+)
+
+// validate the config and return a
+func (c *RegexConfig) validate() (*regexp.Regexp, error) {
+
+	if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil {
+		return nil, errors.New(ErrEmptyRegexStageConfig)
+	}
+
+	if c.Expression == "" {
+		return nil, errors.New(ErrExpressionRequired)
+	}
+
+	expr, err := regexp.Compile(c.Expression)
+	if err != nil {
+		return nil, errors.Wrap(err, ErrCouldNotCompileRegex)
+	}
+
+	if c.Output != nil && (c.Output.Source == nil || (c.Output.Source != nil && *c.Output.Source == "")) {
+		return nil, errors.New(ErrOutputSourceRequired)
+	}
+
+	if c.Timestamp != nil {
+		if c.Timestamp.Source == nil || *c.Timestamp.Source == "" {
+			return nil, errors.New(ErrTimestampSourceRequired)
+		}
+		if c.Timestamp.Format == "" {
+			return nil, errors.New(ErrTimestampFormatRequired)
+		}
+		foundName := false
+		for _, n := range expr.SubexpNames() {
+			if n == *c.Timestamp.Source {
+				foundName = true
+			}
+		}
+		if !foundName {
+			return nil, errors.Errorf(ErrTimestampGroupRequired, *c.Timestamp.Source)
+		}
+		c.Timestamp.Format = convertDateLayout(c.Timestamp.Format)
+	}
+
+	for labelName, labelSrc := range c.Labels {
+		if !model.LabelName(labelName).IsValid() {
+			return nil, fmt.Errorf(ErrInvalidLabelName, labelName)
+		}
+		if labelSrc == nil || *labelSrc.Source == "" {
+			lName := labelName
+			c.Labels[labelName] = &RegexLabel{
+				&lName,
+			}
+		}
+	}
+
+	return expr, nil
+}
+
+type regexStage struct {
+	cfg        *RegexConfig
+	expression *regexp.Regexp
+	logger     log.Logger
+}
+
+// NewRegex creates a new regular expression based pipeline processing stage.
+func NewRegex(logger log.Logger, config interface{}) (Stage, error) {
+	cfg, err := newRegexConfig(config)
+	if err != nil {
+		return nil, err
+	}
+	expression, err := cfg.validate()
+	if err != nil {
+		return nil, err
+	}
+	return &regexStage{
+		cfg:        cfg,
+		expression: expression,
+		logger:     log.With(logger, "component", "parser", "type", "regex"),
+	}, nil
+}
+
+// Process implements a pipeline stage
+func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
+	if entry == nil {
+		level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
+		return
+	}
+
+	match := r.expression.FindStringSubmatch(*entry)
+	if match == nil {
+		level.Debug(r.logger).Log("msg", "regex failed to match")
+		return
+	}
+	groups := make(map[string]string)
+	for i, name := range r.expression.SubexpNames() {
+		if i != 0 && name != "" {
+			groups[name] = match[i]
+		}
+	}
+
+	// Parsing timestamp.
+	if r.cfg.Timestamp != nil {
+		if ts, ok := groups[*r.cfg.Timestamp.Source]; ok {
+			parsedTs, err := time.Parse(r.cfg.Timestamp.Format, ts)
+			if err != nil {
+				level.Debug(r.logger).Log("msg", "failed to parse time", "err", err, "format", r.cfg.Timestamp.Format, "value", ts)
+			} else {
+				*t = parsedTs
+			}
+		} else {
+			level.Debug(r.logger).Log("msg", "regex didn't match timestamp source")
+		}
+	}
+
+	// Parsing labels.
+	for lName, lSrc := range r.cfg.Labels {
+		lValue, ok := groups[*lSrc.Source]
+		if !ok {
+			continue
+		}
+		labelValue := model.LabelValue(lValue)
+		if !labelValue.IsValid() {
+			level.Debug(r.logger).Log("msg", "invalid label value parsed", "value", labelValue)
+			continue
+		}
+		labels[model.LabelName(lName)] = labelValue
+	}
+
+	// Parsing output.
+	if r.cfg.Output != nil {
+		if o, ok := groups[*r.cfg.Output.Source]; ok {
+			*entry = o
+		} else {
+			level.Debug(r.logger).Log("msg", "regex didn't match output source")
+		}
+	}
+
+}
diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..17c9de914230c55c3fd7abc08ab94e2c8358d5bd
--- /dev/null
+++ b/pkg/logentry/stages/regex_test.go
@@ -0,0 +1,417 @@
+package stages
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/model"
+	"github.com/stretchr/testify/assert"
+	"gopkg.in/yaml.v2"
+)
+
+var regexCfg = `regex: 
+  timestamp:
+    source: time
+    format: RFC3339
+  labels:
+    stream:
+      source: stream
+  output:
+    source: log`
+
+// nolint
+func TestRegexMapStructure(t *testing.T) {
+	t.Parallel()
+
+	// testing that we can use yaml data into mapstructure.
+	var mapstruct map[interface{}]interface{}
+	if err := yaml.Unmarshal([]byte(regexCfg), &mapstruct); err != nil {
+		t.Fatalf("error while un-marshalling config: %s", err)
+	}
+	p, ok := mapstruct["regex"].(map[interface{}]interface{})
+	if !ok {
+		t.Fatalf("could not read parser %+v", mapstruct["regex"])
+	}
+	got, err := newRegexConfig(p)
+	if err != nil {
+		t.Fatalf("could not create parser from yaml: %s", err)
+	}
+	want := &RegexConfig{
+		Labels: map[string]*RegexLabel{
+			"stream": {
+				Source: String("stream"),
+			},
+		},
+		Output: &RegexOutput{Source: String("log")},
+		Timestamp: &RegexTimestamp{
+			Format: "RFC3339",
+			Source: String("time"),
+		},
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Fatalf("want: %+v got: %+v", want, got)
+	}
+}
+
+func TestRegexConfig_validate(t *testing.T) {
+	t.Parallel()
+	tests := map[string]struct {
+		config interface{}
+		err    error
+	}{
+		"empty": {
+			map[string]interface{}{},
+			errors.New(ErrEmptyRegexStageConfig),
+		},
+		"missing output info": {
+			map[string]interface{}{
+				"expression": ".*",
+				"output":     map[string]interface{}{},
+			},
+			errors.New(ErrOutputSourceRequired),
+		},
+		"missing regex_expression": {
+			map[string]interface{}{
+				"output": map[string]interface{}{},
+			},
+			errors.New(ErrExpressionRequired),
+		},
+		"invalid regex_expression": {
+			map[string]interface{}{
+				"expression": "(?P<ts[0-9]+).*",
+				"output":     map[string]interface{}{},
+			},
+			errors.New(ErrCouldNotCompileRegex + ": error parsing regexp: invalid named capture: `(?P<ts[0-9]+).*`"),
+		},
+		"missing output source": {
+			map[string]interface{}{
+				"expression": ".*",
+				"output": map[string]interface{}{
+					"source": "",
+				},
+			},
+			errors.New(ErrOutputSourceRequired),
+		},
+		"invalid output source": {
+			map[string]interface{}{
+				"expression": ".*",
+				"output": map[string]interface{}{
+					"source": "[",
+				},
+			},
+			nil,
+		},
+		"missing timestamp source": {
+			map[string]interface{}{
+				"expression": ".*",
+				"timestamp": map[string]interface{}{
+					"source": "",
+					"format": "ANSIC",
+				},
+			},
+			errors.New(ErrTimestampSourceRequired),
+		},
+		"missing timestamp format": {
+			map[string]interface{}{
+				"expression": ".*",
+				"timestamp": map[string]interface{}{
+					"source": "test",
+					"format": "",
+				},
+			},
+			errors.New(ErrTimestampFormatRequired),
+		},
+		"invalid label name": {
+			map[string]interface{}{
+				"expression": ".*",
+				"labels": map[string]interface{}{
+					"": map[string]interface{}{},
+				},
+			},
+			fmt.Errorf(ErrInvalidLabelName, ""),
+		},
+		"invalid label source": {
+			map[string]interface{}{
+				"expression": ".*",
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "]",
+					},
+				},
+			},
+			nil,
+		},
+		"missing_timestamp_group": {
+			map[string]interface{}{
+				"expression": ".*",
+				"timestamp": map[string]interface{}{
+					"source": "ts",
+					"format": "RFC3339",
+				},
+			},
+			errors.Errorf(ErrTimestampGroupRequired, "ts"),
+		},
+		"valid": {
+			map[string]interface{}{
+				"expression": "(?P<ts>[0-9]+).*",
+				"output": map[string]interface{}{
+					"source": "log",
+				},
+				"timestamp": map[string]interface{}{
+					"source": "ts",
+					"format": "RFC3339",
+				},
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "test",
+					},
+					"app": map[string]interface{}{
+						"source": "app",
+					},
+					"level": nil,
+				},
+			},
+			nil,
+		},
+	}
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			c, err := newRegexConfig(tt.config)
+			if err != nil {
+				t.Fatalf("failed to create config: %s", err)
+			}
+			_, err = c.validate()
+			if (err != nil) != (tt.err != nil) {
+				t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err)
+				return
+			}
+			if (err != nil) && (err.Error() != tt.err.Error()) {
+				t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err)
+				return
+			}
+		})
+	}
+}
+
+var (
+	regexLogFixture                 = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
+	regexLogFixtureMissingLabel     = `2016-10-06T00:17:09.669794202Z stdout k `
+	regexLogFixtureInvalidTimestamp = `2016-10-06sfsT00:17:09.669794202Z stdout k `
+)
+
+func TestRegexParser_Parse(t *testing.T) {
+	t.Parallel()
+
+	est, err := time.LoadLocation("America/New_York")
+	if err != nil {
+		t.Fatal("could not parse timestamp", err)
+	}
+
+	utc, err := time.LoadLocation("UTC")
+	if err != nil {
+		t.Fatal("could not parse timestamp", err)
+	}
+
+	tests := map[string]struct {
+		config         interface{}
+		entry          string
+		expectedEntry  string
+		t              time.Time
+		expectedT      time.Time
+		labels         map[string]string
+		expectedLabels map[string]string
+	}{
+		"happy path": {
+			map[string]interface{}{
+				"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
+				"timestamp": map[string]interface{}{
+					"source": "timestamp",
+					"format": "02/Jan/2006:15:04:05 -0700",
+				},
+				"labels": map[string]interface{}{
+					"action": map[string]interface{}{
+						"source": "action",
+					},
+					"status_code": map[string]interface{}{
+						"source": "status",
+					},
+				},
+			},
+			regexLogFixture,
+			regexLogFixture,
+			time.Now(),
+			time.Date(2000, 01, 25, 14, 00, 01, 0, est),
+			nil,
+			map[string]string{
+				"action":      "GET",
+				"status_code": "200",
+			},
+		},
+		"modify output": {
+			map[string]interface{}{
+				"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
+				"timestamp": map[string]interface{}{
+					"source": "timestamp",
+					"format": "02/Jan/2006:15:04:05 -0700",
+				},
+				"labels": map[string]interface{}{
+					"action": map[string]interface{}{
+						"source": "action",
+					},
+					"status_code": map[string]interface{}{
+						"source": "status",
+					},
+				},
+				"output": map[string]interface{}{
+					"source": "path",
+				},
+			},
+			regexLogFixture,
+			"/1986.js",
+			time.Now(),
+			time.Date(2000, 01, 25, 14, 00, 01, 0, est),
+			nil,
+			map[string]string{
+				"action":      "GET",
+				"status_code": "200",
+			},
+		},
+		"missing label": {
+			map[string]interface{}{
+				"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<missing_label>.*)$",
+				"timestamp": map[string]interface{}{
+					"source": "time",
+					"format": "RFC3339",
+				},
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "stream",
+					},
+					"missing_label": map[string]interface{}{
+						"source": "missing_label",
+					},
+				},
+			},
+			regexLogFixtureMissingLabel,
+			regexLogFixtureMissingLabel,
+			time.Now(),
+			time.Date(2016, 10, 06, 00, 17, 9, 669794202, utc),
+			nil,
+			map[string]string{
+				"stream":        "stdout",
+				"missing_label": "",
+			},
+		},
+		"invalid timestamp skipped": {
+			map[string]interface{}{
+				"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<message>.*)$",
+				"timestamp": map[string]interface{}{
+					"source": "time",
+					"format": "UnixDate",
+				},
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "stream",
+					},
+				},
+			},
+			regexLogFixtureInvalidTimestamp,
+			regexLogFixtureInvalidTimestamp,
+			time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
+			time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
+			nil,
+			map[string]string{
+				"stream": "stdout",
+			},
+		},
+		"match failed": {
+			map[string]interface{}{
+				"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<message>.*)$",
+				"timestamp": map[string]interface{}{
+					"source": "time",
+					"format": "UnixDate",
+				},
+				"labels": map[string]interface{}{
+					"stream": map[string]interface{}{
+						"source": "stream",
+					},
+				},
+			},
+			"blahblahblah",
+			"blahblahblah",
+			time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
+			time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
+			map[string]string{
+				"stream": "stdout",
+			},
+			map[string]string{
+				"stream": "stdout",
+			},
+		},
+	}
+	for tName, tt := range tests {
+		tt := tt
+		t.Run(tName, func(t *testing.T) {
+			t.Parallel()
+			p, err := NewRegex(util.Logger, tt.config)
+			if err != nil {
+				t.Fatalf("failed to create regex parser: %s", err)
+			}
+			lbs := toLabelSet(tt.labels)
+			p.Process(lbs, &tt.t, &tt.entry)
+
+			assertLabels(t, tt.expectedLabels, lbs)
+			assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
+			if tt.t.Unix() != tt.expectedT.Unix() {
+				t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
+			}
+		})
+	}
+}
+
+func Benchmark(b *testing.B) {
+	benchmarks := []struct {
+		name   string
+		config map[string]interface{}
+		entry  string
+	}{
+		{
+			"apache common log",
+			map[string]interface{}{
+				"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
+				"timestamp": map[string]interface{}{
+					"source": "timestamp",
+					"format": "02/Jan/2006:15:04:05 -0700",
+				},
+				"labels": map[string]interface{}{
+					"action": map[string]interface{}{
+						"source": "action",
+					},
+					"status_code": map[string]interface{}{
+						"source": "status",
+					},
+				},
+			},
+			regexLogFixture,
+		},
+	}
+	for _, bm := range benchmarks {
+		b.Run(bm.name, func(b *testing.B) {
+			stage, err := NewRegex(util.Logger, bm.config)
+			if err != nil {
+				panic(err)
+			}
+			labels := model.LabelSet{}
+			ts := time.Now()
+			for i := 0; i < b.N; i++ {
+				entry := bm.entry
+				stage.Process(labels, &ts, &entry)
+			}
+		})
+	}
+}
diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go
new file mode 100644
index 0000000000000000000000000000000000000000..f9c319d7b27bea0ab9cb404a710e8fda300eeda0
--- /dev/null
+++ b/pkg/logentry/stages/stage.go
@@ -0,0 +1,13 @@
+package stages
+
+import (
+	"time"
+
+	"github.com/prometheus/common/model"
+)
+
+// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
+// timestamp and log entry
+type Stage interface {
+	Process(labels model.LabelSet, time *time.Time, entry *string)
+}
diff --git a/pkg/logentry/stages/util.go b/pkg/logentry/stages/util.go
new file mode 100644
index 0000000000000000000000000000000000000000..89ce1635e2337021f00e392722edb134e319f470
--- /dev/null
+++ b/pkg/logentry/stages/util.go
@@ -0,0 +1,31 @@
+package stages
+
+import "time"
+
+// convertDateLayout converts pre-defined date format layout into date format
+func convertDateLayout(predef string) string {
+	switch predef {
+	case "ANSIC":
+		return time.ANSIC
+	case "UnixDate":
+		return time.UnixDate
+	case "RubyDate":
+		return time.RubyDate
+	case "RFC822":
+		return time.RFC822
+	case "RFC822Z":
+		return time.RFC822Z
+	case "RFC850":
+		return time.RFC850
+	case "RFC1123":
+		return time.RFC1123
+	case "RFC1123Z":
+		return time.RFC1123Z
+	case "RFC3339":
+		return time.RFC3339
+	case "RFC3339Nano":
+		return time.RFC3339Nano
+	default:
+		return predef
+	}
+}
diff --git a/pkg/logentry/stages/util_test.go b/pkg/logentry/stages/util_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..26bd2f76834dffeff92c275ea2818a892f536480
--- /dev/null
+++ b/pkg/logentry/stages/util_test.go
@@ -0,0 +1,39 @@
+package stages
+
+import (
+	"testing"
+	"time"
+
+	"github.com/prometheus/common/model"
+	"github.com/stretchr/testify/assert"
+)
+
+// nolint
+func mustParseTime(layout, value string) time.Time {
+	t, err := time.Parse(layout, value)
+	if err != nil {
+		panic(err)
+	}
+	return t
+}
+
+func toLabelSet(lbs map[string]string) model.LabelSet {
+	res := model.LabelSet{}
+	for k, v := range lbs {
+		res[model.LabelName(k)] = model.LabelValue(v)
+	}
+	return res
+}
+
+func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) {
+	if len(expect) != len(got) {
+		t.Fatalf("labels are not equal in size want: %s got: %s", expect, got)
+	}
+	for k, v := range expect {
+		gotV, ok := got[model.LabelName(k)]
+		if !ok {
+			t.Fatalf("missing expected label key: %s", k)
+		}
+		assert.Equal(t, model.LabelValue(v), gotV, "mismatch label value")
+	}
+}
diff --git a/pkg/promtail/scrape/scrape.go b/pkg/promtail/scrape/scrape.go
index 932b51e433880e43604cfdae8106208fca301857..d2e8402297a79876026f6812b611ab3b6a27f225 100644
--- a/pkg/promtail/scrape/scrape.go
+++ b/pkg/promtail/scrape/scrape.go
@@ -6,6 +6,7 @@ import (
 	sd_config "github.com/prometheus/prometheus/discovery/config"
 	"github.com/prometheus/prometheus/pkg/relabel"
 
+	"github.com/grafana/loki/pkg/logentry"
 	"github.com/grafana/loki/pkg/promtail/api"
 )
 
@@ -13,6 +14,7 @@ import (
 type Config struct {
 	JobName                string                           `yaml:"job_name,omitempty"`
 	EntryParser            api.EntryParser                  `yaml:"entry_parser"`
+	PipelineStages         logentry.PipelineStages          `yaml:"pipeline_stages,omitempty"`
 	RelabelConfigs         []*relabel.Config                `yaml:"relabel_configs,omitempty"`
 	ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
 }
diff --git a/pkg/promtail/scrape/scrape_test.go b/pkg/promtail/scrape/scrape_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..48ccba99d440601b400db18409daee568a45a661
--- /dev/null
+++ b/pkg/promtail/scrape/scrape_test.go
@@ -0,0 +1,72 @@
+package scrape
+
+import (
+	"testing"
+
+	"gopkg.in/yaml.v2"
+)
+
+// todo add full example.
+var testYaml = `
+pipeline_stages:
+  - regex:
+      expr: "./*"
+  - json: 
+      timestamp:
+        source: time
+        format: RFC3339
+      labels:
+        stream:
+          source: json_key_name.json_sub_key_name
+      output:
+        source: log     
+job_name: kubernetes-pods-name
+kubernetes_sd_configs:
+- role: pod
+relabel_configs:
+- source_labels:
+  - __meta_kubernetes_pod_label_name
+  target_label: __service__
+- source_labels:
+  - __meta_kubernetes_pod_node_name
+  target_label: __host__
+- action: drop
+  regex: ^$
+  source_labels:
+  - __service__
+- action: replace
+  replacement: $1
+  separator: /
+  source_labels:
+  - __meta_kubernetes_namespace
+  - __service__
+  target_label: job
+- action: replace
+  source_labels:
+  - __meta_kubernetes_namespace
+  target_label: namespace
+- action: replace
+  source_labels:
+  - __meta_kubernetes_pod_name
+  target_label: instance
+- action: replace
+  source_labels:
+  - __meta_kubernetes_pod_container_name
+  target_label: container_name
+- action: labelmap
+  regex: __meta_kubernetes_pod_label_(.+)
+- replacement: /var/log/pods/$1/*.log
+  separator: /
+  source_labels:
+  - __meta_kubernetes_pod_uid
+  - __meta_kubernetes_pod_container_name
+  target_label: __path__
+`
+
+func TestLoadConfig(t *testing.T) {
+	var config Config
+	err := yaml.Unmarshal([]byte(testYaml), &config)
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go
index 6aded414e5483e684bfdeb2aebe8c0b59d85de40..a3a53039c843a138280fc9b2ec6ed8b42b991610 100644
--- a/pkg/promtail/targets/filetargetmanager.go
+++ b/pkg/promtail/targets/filetargetmanager.go
@@ -19,6 +19,8 @@ import (
 	"github.com/prometheus/prometheus/relabel"
 
 	"github.com/grafana/loki/pkg/helpers"
+	"github.com/grafana/loki/pkg/logentry"
+	"github.com/grafana/loki/pkg/logentry/stages"
 	"github.com/grafana/loki/pkg/promtail/api"
 	"github.com/grafana/loki/pkg/promtail/positions"
 	"github.com/grafana/loki/pkg/promtail/scrape"
@@ -40,6 +42,12 @@ var (
 		Name:      "targets_active_total",
 		Help:      "Number of active total.",
 	})
+	pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "promtail",
+		Name:      "pipeline_duration_microseconds",
+		Help:      "Label extraction pipeline processing time, in microseconds",
+		Buckets:   []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 25000},
+	}, []string{"job_name"})
 )
 
 // FileTargetManager manages a set of targets.
@@ -73,6 +81,36 @@ func NewFileTargetManager(
 
 	config := map[string]sd_config.ServiceDiscoveryConfig{}
 	for _, cfg := range scrapeConfigs {
+		obs := pipelineDuration.WithLabelValues(cfg.JobName)
+		pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &obs)
+		if err != nil {
+			return nil, err
+		}
+
+		// Backwards compatibility with old EntryParser config
+		if pipeline.Size() == 0 {
+			switch cfg.EntryParser {
+			case api.CRI:
+				level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
+				cri, err := stages.NewCRI(logger)
+				if err != nil {
+					return nil, err
+				}
+				pipeline.AddStage(cri)
+			case api.Docker:
+				level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
+				docker, err := stages.NewDocker(logger)
+				if err != nil {
+					return nil, err
+				}
+				pipeline.AddStage(docker)
+			case api.Raw:
+				level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
+			default:
+
+			}
+		}
+
 		s := &targetSyncer{
 			log:            logger,
 			positions:      positions,
@@ -80,7 +118,7 @@ func NewFileTargetManager(
 			targets:        map[string]*FileTarget{},
 			droppedTargets: []Target{},
 			hostname:       hostname,
-			entryHandler:   cfg.EntryParser.Wrap(client),
+			entryHandler:   pipeline.Wrap(client),
 			targetConfig:   targetConfig,
 		}
 		tm.syncers[cfg.JobName] = s
diff --git a/production/helm/promtail/values.yaml b/production/helm/promtail/values.yaml
index b2020dbd4802db4be5860370e4256fa377aca6ac..285dbc64016689cbdbce3dadc9c502168dad9d34 100644
--- a/production/helm/promtail/values.yaml
+++ b/production/helm/promtail/values.yaml
@@ -6,8 +6,6 @@ annotations: {}
 
 deploymentStrategy: RollingUpdate
 
-entryParser: docker
-
 image:
   repository: grafana/promtail
   tag: latest
@@ -124,8 +122,9 @@ config:
     # Period to resync directories being watched and files being tailed
     sync_period: 10s
   scrape_configs:
-  - entry_parser: '{{ .Values.entryParser }}'
-    job_name: kubernetes-pods-name
+  - job_name: kubernetes-pods-name
+    pipeline_stages:
+      - docker: {}
     kubernetes_sd_configs:
     - role: pod
     relabel_configs:
@@ -166,8 +165,9 @@ config:
       - __meta_kubernetes_pod_uid
       - __meta_kubernetes_pod_container_name
       target_label: __path__
-  - entry_parser: '{{ .Values.entryParser }}'
-    job_name: kubernetes-pods-app
+  - job_name: kubernetes-pods-app
+    pipeline_stages:
+      - docker: {}
     kubernetes_sd_configs:
     - role: pod
     relabel_configs:
@@ -212,8 +212,9 @@ config:
       - __meta_kubernetes_pod_uid
       - __meta_kubernetes_pod_container_name
       target_label: __path__
-  - entry_parser: '{{ .Values.entryParser }}'
-    job_name: kubernetes-pods-direct-controllers
+  - job_name: kubernetes-pods-direct-controllers
+    pipeline_stages:
+      - docker: {}
     kubernetes_sd_configs:
     - role: pod
     relabel_configs:
@@ -264,8 +265,9 @@ config:
       - __meta_kubernetes_pod_uid
       - __meta_kubernetes_pod_container_name
       target_label: __path__
-  - entry_parser: '{{ .Values.entryParser }}'
-    job_name: kubernetes-pods-indirect-controller
+  - job_name: kubernetes-pods-indirect-controller
+    pipeline_stages:
+      - docker: {}
     kubernetes_sd_configs:
     - role: pod
     relabel_configs:
@@ -318,8 +320,9 @@ config:
       - __meta_kubernetes_pod_uid
       - __meta_kubernetes_pod_container_name
       target_label: __path__
-  - entry_parser: '{{ .Values.entryParser }}'
-    job_name: kubernetes-pods-static
+  - job_name: kubernetes-pods-static
+    pipeline_stages:
+      - docker: {}
     kubernetes_sd_configs:
     - role: pod
     relabel_configs:
diff --git a/production/ksonnet/promtail/config.libsonnet b/production/ksonnet/promtail/config.libsonnet
index 01ace73f47a347ebef8e22fbe0713916fbf6d307..a3df83eba51fbfa6f1a8159e05808e360bef7eae 100644
--- a/production/ksonnet/promtail/config.libsonnet
+++ b/production/ksonnet/promtail/config.libsonnet
@@ -14,7 +14,9 @@
         external_labels: {},
       }],
       container_root_path: '/var/lib/docker',
-      entry_parser: 'docker',
+      pipeline_stages: [{
+        docker: {},
+      }],
     },
   },
 }
diff --git a/production/ksonnet/promtail/scrape_config.libsonnet b/production/ksonnet/promtail/scrape_config.libsonnet
index 81eba8855e6f69d7c09294abdf665023761f9190..4ea41857b04a86f7746da8a7e3fb5e852c649869 100644
--- a/production/ksonnet/promtail/scrape_config.libsonnet
+++ b/production/ksonnet/promtail/scrape_config.libsonnet
@@ -3,7 +3,7 @@ local config = import 'config.libsonnet';
 config + {
   local gen_scrape_config(job_name, pod_uid) = {
     job_name: job_name,
-    entry_parser: $._config.promtail_config.entry_parser,
+    pipeline_stages: $._config.promtail_config.pipeline_stages,
     kubernetes_sd_configs: [{
       role: 'pod',
     }],