Skip to content
Snippets Groups Projects
Commit ebb4f483 authored by Edward Welch's avatar Edward Welch Committed by Ed
Browse files

updating tests

adding CRI and Docker format pipeline stage extensions
adding backwards compatibility with current config
parent 0fac6180
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,8 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) {
for _, s := range stgs {
stage, ok := s.(map[interface{}]interface{})
if !ok {
return nil, errors.New("invalid YAML config")
return nil, errors.Errorf("invalid YAML config, "+
"make sure each stage of your pipeline is a YAML object (must end with a `:`), check stage `- %s`", s)
}
if len(stage) > 1 {
return nil, errors.New("pipeline stage must contain only one key")
......@@ -47,6 +48,18 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) {
return nil, errors.Wrap(err, "invalid regex stage config")
}
st = append(st, regex)
case "docker":
docker, err := stages.NewDocker(log)
if err != nil {
return nil, errors.Wrap(err, "invalid docker stage config")
}
st = append(st, docker)
case "cri":
cri, err := stages.NewCri(log)
if err != nil {
return nil, errors.Wrap(err, "invalid cri stage config")
}
st = append(st, cri)
}
}
}
......@@ -71,3 +84,13 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return next.Handle(labels, timestamp, line)
})
}
// AddStage adds a stage to the pipeline
func (p *Pipeline) AddStage(stage stages.Stage) {
p.stages = append(p.stages, stage)
}
// Size gets the current number of stages in the pipeline
func (p *Pipeline) Size() int {
return len(p.stages)
}
......@@ -2,27 +2,31 @@ package logentry
import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)
var rawTestLine = `{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
var processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
var testYaml = `
pipeline_stages:
- docker:
- regex:
expression: "./*"
labels:
stream:
- json:
expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
timestamp:
source: time
format: RFC3339
source: timestamp
format: "02/Jan/2006:15:04:05 -0700"
labels:
stream:
source: json_key_name.json_sub_key_name
output:
source: log
action:
source: "action"
status_code:
source: "status"
`
func TestNewPipeline(t *testing.T) {
......@@ -39,3 +43,60 @@ func TestNewPipeline(t *testing.T) {
t.Fatal("missing stages")
}
}
func TestPipeline_MultiStage(t *testing.T) {
est, err := time.LoadLocation("America/New_York")
if err != nil {
t.Fatal("could not parse timestamp", err)
}
var config map[string]interface{}
err = yaml.Unmarshal([]byte(testYaml), &config)
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}))
if err != nil {
panic(err)
}
tests := map[string]struct {
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels model.LabelSet
expectedLabels model.LabelSet
}{
"happy path": {
rawTestLine,
processedTestLine,
time.Now(),
time.Date(2000, 01, 25, 14, 00, 01, 0, est),
map[model.LabelName]model.LabelValue{
"test": "test",
},
map[model.LabelName]model.LabelValue{
"test": "test",
"stream": "stderr",
"action": "GET",
"status_code": "200",
},
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p.Process(tt.labels, &tt.t, &tt.entry)
assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels")
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
}
}
package stages
import (
"github.com/go-kit/kit/log"
)
// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger) (Stage, error) {
config := map[string]interface{}{
"timestamp": map[string]interface{}{
"source": "time",
"format": "RFC3339",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "stream",
},
},
"output": map[string]interface{}{
"source": "log",
},
}
return NewJSON(logger, config)
}
// NewCri creates a CRI format specific pipeline stage
func NewCri(logger log.Logger) (Stage, error) {
config := map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
"timestamp": map[string]interface{}{
"source": "time",
"format": "RFC3339Nano",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "stream",
},
},
"output": map[string]interface{}{
"source": "content",
},
}
return NewRegex(logger, config)
}
package stages
import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/stretchr/testify/assert"
)
var (
dockerRaw = `{"log":"level=info ts=2019-04-30T02:12:41.844179Z caller=filetargetmanager.go:180 msg=\"Adding target\" key=\"{com_docker_deploy_namespace=\\\"docker\\\", com_docker_fry=\\\"compose.api\\\", com_docker_image_tag=\\\"v0.4.12\\\", container_name=\\\"compose\\\", instance=\\\"compose-api-cbff6dfc9-cqfr8\\\", job=\\\"docker/compose-api\\\", namespace=\\\"docker\\\", pod_template_hash=\\\"769928975\\\"}\"\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
dockerProcessed = `level=info ts=2019-04-30T02:12:41.844179Z caller=filetargetmanager.go:180 msg="Adding target" key="{com_docker_deploy_namespace=\"docker\", com_docker_fry=\"compose.api\", com_docker_image_tag=\"v0.4.12\", container_name=\"compose\", instance=\"compose-api-cbff6dfc9-cqfr8\", job=\"docker/compose-api\", namespace=\"docker\", pod_template_hash=\"769928975\"}"
`
dockerInvalidTimestampRaw = `{"log":"log message\n","stream":"stderr","time":"hi!"}`
dockerTestTimeNow = time.Now()
)
func TestNewDocker(t *testing.T) {
loc, err := time.LoadLocation("UTC")
if err != nil {
t.Fatal("could not parse timezone", err)
}
tests := map[string]struct {
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels map[string]string
expectedLabels map[string]string
}{
"happy path": {
dockerRaw,
dockerProcessed,
time.Now(),
time.Date(2019, 4, 30, 02, 12, 41, 844351500, loc),
map[string]string{},
map[string]string{
"stream": "stderr",
},
},
"invalid timestamp": {
dockerInvalidTimestampRaw,
"log message\n",
dockerTestTimeNow,
dockerTestTimeNow,
map[string]string{},
map[string]string{
"stream": "stderr",
},
},
"invalid json": {
"i'm not json!",
"i'm not json!",
dockerTestTimeNow,
dockerTestTimeNow,
map[string]string{},
map[string]string{},
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util.Logger)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
lbs := toLabelSet(tt.labels)
p.Process(lbs, &tt.t, &tt.entry)
assertLabels(t, tt.expectedLabels, lbs)
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
}
}
var (
criTestTimeStr = "2019-01-01T01:00:00.000000001Z"
criTestTime, _ = time.Parse(time.RFC3339Nano, criTestTimeStr)
criTestTime2 = time.Now()
)
func TestNewCri(t *testing.T) {
tests := map[string]struct {
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels map[string]string
expectedLabels map[string]string
}{
"happy path": {
criTestTimeStr + " stderr P message",
"message",
time.Now(),
criTestTime,
map[string]string{},
map[string]string{
"stream": "stderr",
},
},
"multi line pass": {
criTestTimeStr + " stderr P message\nmessage2",
"message\nmessage2",
time.Now(),
criTestTime,
map[string]string{},
map[string]string{
"stream": "stderr",
},
},
"invalid timestamp": {
"3242 stderr P message",
"message",
criTestTime2,
criTestTime2,
map[string]string{},
map[string]string{
"stream": "stderr",
},
},
"invalid line": {
"i'm invalid!!!",
"i'm invalid!!!",
criTestTime2,
criTestTime2,
map[string]string{},
map[string]string{},
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCri(util.Logger)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}
lbs := toLabelSet(tt.labels)
p.Process(lbs, &tt.t, &tt.entry)
assertLabels(t, tt.expectedLabels, lbs)
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
}
}
......@@ -153,6 +153,7 @@ func (j *jsonStage) getJSONValue(expr *string, data map[string]interface{}) (res
func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
if entry == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
return
}
var data map[string]interface{}
if err := json.Unmarshal([]byte(*entry), &data); err != nil {
......
......@@ -6,6 +6,7 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)
......@@ -37,7 +38,7 @@ func TestYamlMapStructure(t *testing.T) {
}
want := &JSONConfig{
Labels: map[string]*JSONLabel{
"stream": &JSONLabel{
"stream": {
Source: String("json_key_name.json_sub_key_name"),
},
},
......@@ -313,6 +314,7 @@ func TestJSONParser_Parse(t *testing.T) {
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewJSON(util.Logger, tt.config)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
......@@ -321,9 +323,7 @@ func TestJSONParser_Parse(t *testing.T) {
p.Process(lbs, &tt.t, &tt.entry)
assertLabels(t, tt.expectedLabels, lbs)
if tt.entry != tt.expectedEntry {
t.Fatalf("mismatch entry want: %s got:%s", tt.expectedEntry, tt.entry)
}
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
......
......@@ -45,6 +45,7 @@ func newRegexConfig(config interface{}) (*RegexConfig, error) {
return cfg, nil
}
// Config Errors
const (
ErrExpressionRequired = "expression is required"
ErrCouldNotCompileRegex = "could not compile regular expression"
......@@ -116,6 +117,7 @@ type regexStage struct {
logger log.Logger
}
// NewRegex creates a new regular expression based pipeline processing stage.
func NewRegex(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := newRegexConfig(config)
if err != nil {
......@@ -132,6 +134,7 @@ func NewRegex(logger log.Logger, config interface{}) (Stage, error) {
}, nil
}
// Process implements a pipeline stage
func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
if entry == nil {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
......
......@@ -8,6 +8,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v2"
)
......@@ -39,7 +40,7 @@ func TestRegexMapStructure(t *testing.T) {
}
want := &RegexConfig{
Labels: map[string]*RegexLabel{
"stream": &RegexLabel{
"stream": {
Source: String("stream"),
},
},
......@@ -195,9 +196,11 @@ func TestRegexConfig_validate(t *testing.T) {
}
}
var regexLogFixture = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
var regexLogFixture_missingLabel = `2016-10-06T00:17:09.669794202Z stdout k `
var regexLogFixture_invalidTimestamp = `2016-10-06sfsT00:17:09.669794202Z stdout k `
var (
regexLogFixture = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
regexLogFixtureMissingLabel = `2016-10-06T00:17:09.669794202Z stdout k `
regexLogFixtureInvalidTimestamp = `2016-10-06sfsT00:17:09.669794202Z stdout k `
)
func TestRegexParser_Parse(t *testing.T) {
t.Parallel()
......@@ -292,8 +295,8 @@ func TestRegexParser_Parse(t *testing.T) {
},
},
},
regexLogFixture_missingLabel,
regexLogFixture_missingLabel,
regexLogFixtureMissingLabel,
regexLogFixtureMissingLabel,
time.Now(),
time.Date(2016, 10, 06, 00, 17, 9, 669794202, utc),
nil,
......@@ -315,8 +318,8 @@ func TestRegexParser_Parse(t *testing.T) {
},
},
},
regexLogFixture_missingLabel,
regexLogFixture_missingLabel,
regexLogFixtureInvalidTimestamp,
regexLogFixtureInvalidTimestamp,
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
nil,
......@@ -352,6 +355,7 @@ func TestRegexParser_Parse(t *testing.T) {
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewRegex(util.Logger, tt.config)
if err != nil {
t.Fatalf("failed to create regex parser: %s", err)
......@@ -360,9 +364,7 @@ func TestRegexParser_Parse(t *testing.T) {
p.Process(lbs, &tt.t, &tt.entry)
assertLabels(t, tt.expectedLabels, lbs)
if tt.entry != tt.expectedEntry {
t.Fatalf("mismatch entry want: %s got:%s", tt.expectedEntry, tt.entry)
}
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
......
......@@ -5,6 +5,7 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
func mustParseTime(layout, value string) time.Time {
......@@ -32,8 +33,6 @@ func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) {
if !ok {
t.Fatalf("missing expected label key: %s", k)
}
if gotV != model.LabelValue(v) {
t.Fatalf("mismatch label value got: %s/%s want %s/%s", k, gotV, k, model.LabelValue(v))
}
assert.Equal(t, model.LabelValue(v), gotV, "mismatch label value")
}
}
package parser
type Label struct {
LabelName string
Source string
}
......@@ -25,7 +25,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
"github.com/grafana/loki/pkg/promtail/api"
......
......@@ -7,11 +7,13 @@ import (
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/promtail/api"
)
// Config describes a job to scrape.
type Config struct {
JobName string `yaml:"job_name,omitempty"`
EntryParser api.EntryParser `yaml:"entry_parser"`
PipelineStages logentry.PipelineStages `yaml:"pipeline_stages,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
......@@ -19,7 +21,7 @@ type Config struct {
// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
//PipelineStages: api.Docker,
EntryParser: api.Docker,
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
......
......@@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/scrape"
......@@ -74,10 +75,36 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages)
if err != nil {
return nil, err
}
// Backwards compatibility with old EntryParser config
if pipeline.Size() == 0 {
switch cfg.EntryParser {
case api.CRI:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
cri, err := stages.NewCri(logger)
if err != nil {
return nil, err
}
pipeline.AddStage(cri)
case api.Docker:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
docker, err := stages.NewDocker(logger)
if err != nil {
return nil, err
}
pipeline.AddStage(docker)
case api.Raw:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
default:
}
}
s := &targetSyncer{
log: logger,
positions: positions,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment