diff --git a/pkg/parser/entry/parsers/json.go b/pkg/parser/entry/parsers/json.go new file mode 100644 index 0000000000000000000000000000000000000000..16261bdc0cac45388c9fbd54f3835ff4ed59c1c3 --- /dev/null +++ b/pkg/parser/entry/parsers/json.go @@ -0,0 +1,231 @@ +package parsers + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/jmespath/go-jmespath" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +// JSONTimestamp configures timestamp extraction +type JSONTimestamp struct { + Source *string `mapstructure:"source"` + Format string `mapstructure:"format"` +} + +// JSONLabel configures a labels value extraction +type JSONLabel struct { + Source *string `mapstructure:"source"` +} + +// JSONOutput configures output value extraction +type JSONOutput struct { + Source *string `mapstructure:"source"` +} + +// JSONConfig configures the log entry parser to extract value from json +type JSONConfig struct { + Timestamp *JSONTimestamp `mapstructure:"timestamp"` + Output *JSONOutput `mapstructure:"output"` + Labels map[string]*JSONLabel `mapstructure:"labels"` +} + +func newJSONConfig(config interface{}) (*JSONConfig, error) { + cfg := &JSONConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +// validate the config and returns a map of necessary jmespath expressions. +func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) { + if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil { + return nil, errors.New("empty json parser configuration") + } + + if c.Output != nil && (c.Output.Source == nil || (c.Output.Source != nil && *c.Output.Source == "")) { + return nil, errors.New("output source value is required") + } + + if c.Timestamp != nil { + if c.Timestamp.Source != nil && *c.Timestamp.Source == "" { + return nil, errors.New("timestamp source value is required") + } + if c.Timestamp.Format == "" { + return nil, errors.New("timestamp format is required") + } + c.Timestamp.Format = convertDateLayout(c.Timestamp.Format) + } + + expressions := map[string]*jmespath.JMESPath{} + var err error + if c.Timestamp != nil && c.Timestamp.Source != nil { + expressions[*c.Timestamp.Source], err = jmespath.Compile(*c.Timestamp.Source) + if err != nil { + return nil, errors.Wrap(err, "could not compile timestamp source jmespath expression") + } + } + if c.Output != nil && c.Output.Source != nil { + expressions[*c.Output.Source], err = jmespath.Compile(*c.Output.Source) + if err != nil { + return nil, errors.Wrap(err, "could not compile output source jmespath expression") + } + } + for labelName, labelSrc := range c.Labels { + if !model.LabelName(labelName).IsValid() { + return nil, fmt.Errorf("invalid label name: %s", labelName) + } + if labelSrc != nil && labelSrc.Source != nil { + expressions[*labelSrc.Source], err = jmespath.Compile(*labelSrc.Source) + if err != nil { + return nil, errors.Wrapf(err, "could not compile label source jmespath expression: %s", labelName) + } + } + } + return expressions, nil +} + +type JSONParser struct { + cfg *JSONConfig + expressions map[string]*jmespath.JMESPath + logger log.Logger +} + +func NewJson(logger log.Logger, config interface{}) (Parser, error) { + cfg, err := newJSONConfig(config) + if err != nil { + return nil, err + } + expressions, err := cfg.validate() + if err != nil { + return nil, err + } + return &JSONParser{ + cfg: cfg, + expressions: expressions, + logger: log.With(logger, "component", "parser", "type", "json"), + }, nil +} + +func (j *JSONParser) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) { + if expr == nil { + result, ok = data[fallback].(string) + if !ok { + level.Warn(j.logger).Log("msg", "field is not a string", "field", fallback) + } + } else { + var searchResult interface{} + searchResult, ok = j.getJSONValue(expr, data) + if !ok { + level.Warn(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) + return + } + result, ok = searchResult.(string) + if !ok { + level.Warn(j.logger).Log("msg", "search result is not a string", "expr", *expr) + } + } + return +} + +func (j *JSONParser) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) { + var err error + ok = true + result, err = j.expressions[*expr].Search(data) + if err != nil { + level.Warn(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) + ok = false + return + } + return +} + +func (j *JSONParser) Parse(labels model.LabelSet, t *time.Time, entry *string) { + if entry == nil { + level.Warn(j.logger).Log("msg", "cannot parse a nil entry") + } + var data map[string]interface{} + if err := json.Unmarshal([]byte(*entry), &data); err != nil { + level.Warn(j.logger).Log("msg", "could not unmarshal json", "err", err) + return + } + + // parsing ts + if j.cfg.Timestamp != nil { + if ts, ok := j.getJSONString(j.cfg.Timestamp.Source, "timestamp", data); ok { + parsedTs, err := time.Parse(j.cfg.Timestamp.Format, ts) + if err != nil { + level.Warn(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts) + } else { + *t = parsedTs + } + } + } + + // parsing labels + for lName, lSrc := range j.cfg.Labels { + var src *string + if lSrc != nil { + src = lSrc.Source + } + lValue, ok := j.getJSONString(src, lName, data) + if !ok { + continue + } + labelValue := model.LabelValue(lValue) + // seems impossible as the json.Unmarshal would yield an error first. + if !labelValue.IsValid() { + level.Warn(j.logger).Log("msg", "invalid label value parsed", "value", labelValue) + continue + } + labels[model.LabelName(lName)] = labelValue + } + + // parsing output + if j.cfg.Output != nil { + if jsonObj, ok := j.getJSONValue(j.cfg.Output.Source, data); ok && jsonObj != nil { + b, err := json.Marshal(jsonObj) + if err != nil { + level.Warn(j.logger).Log("msg", "could not marshal output value", "err", err) + return + } + *entry = string(b) + } + } +} + +// convertDateLayout converts pre-defined date format layout into date format +func convertDateLayout(predef string) string { + switch predef { + case "ANSIC": + return time.ANSIC + case "UnixDate": + return time.UnixDate + case "RubyDate": + return time.RubyDate + case "RFC822": + return time.RFC822 + case "RFC822Z": + return time.RFC822Z + case "RFC850": + return time.RFC850 + case "RFC1123": + return time.RFC1123 + case "RFC1123Z": + return time.RFC1123Z + case "RFC3339": + return time.RFC3339 + case "RFC3339Nano": + return time.RFC3339Nano + default: + return predef + } +} diff --git a/pkg/parser/entry/parsers/json_test.go b/pkg/parser/entry/parsers/json_test.go new file mode 100644 index 0000000000000000000000000000000000000000..56bf4db5e4aefb1ffc6b6527020ff9314f9401a3 --- /dev/null +++ b/pkg/parser/entry/parsers/json_test.go @@ -0,0 +1,351 @@ +package parsers + +import ( + "reflect" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "gopkg.in/yaml.v2" +) + +var cfg = `json: + timestamp: + source: time + format: RFC3339 + labels: + stream: + source: json_key_name.json_sub_key_name + output: + source: log` + +func TestYamlMapStructure(t *testing.T) { + t.Parallel() + + // testing that we can use yaml data into mapstructure. + var mapstruct map[interface{}]interface{} + if err := yaml.Unmarshal([]byte(cfg), &mapstruct); err != nil { + t.Fatalf("error while un-marshalling config: %s", err) + } + p, ok := mapstruct["json"].(map[interface{}]interface{}) + if !ok { + t.Fatalf("could not read parser %+v", mapstruct["json"]) + } + got, err := newJSONConfig(p) + if err != nil { + t.Fatalf("could not create parser from yaml: %s", err) + } + want := &JSONConfig{ + Labels: map[string]*JSONLabel{ + "stream": &JSONLabel{ + Source: String("json_key_name.json_sub_key_name"), + }, + }, + Output: &JSONOutput{Source: String("log")}, + Timestamp: &JSONTimestamp{ + Format: "RFC3339", + Source: String("time"), + }, + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("want: %+v got: %+v", want, got) + } +} + +func String(s string) *string { + return &s +} + +func TestJSONConfig_validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + config interface{} + wantExprCount int + wantErr bool + }{ + "empty": { + map[string]interface{}{}, + 0, + true, + }, + "missing output info": { + map[string]interface{}{ + "output": map[string]interface{}{}, + }, + 0, + true, + }, + "missing output source": { + map[string]interface{}{ + "output": map[string]interface{}{ + "source": "", + }, + }, + 0, + true, + }, + "invalid output source": { + map[string]interface{}{ + "output": map[string]interface{}{ + "source": "[", + }, + }, + 0, + true, + }, + "missing timestamp source": { + map[string]interface{}{ + "timestamp": map[string]interface{}{ + "source": "", + "format": "ANSIC", + }, + }, + 0, + true, + }, + "invalid timestamp source": { + map[string]interface{}{ + "timestamp": map[string]interface{}{ + "source": "[", + "format": "ANSIC", + }, + }, + 0, + true, + }, + "missing timestamp format": { + map[string]interface{}{ + "timestamp": map[string]interface{}{ + "source": "test", + "format": "", + }, + }, + 0, + true, + }, + "invalid label name": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "": map[string]interface{}{}, + }, + }, + 0, + true, + }, + "invalid label source": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "stream": map[string]interface{}{ + "source": "]", + }, + }, + }, + 0, + true, + }, + "valid": { + map[string]interface{}{ + "output": map[string]interface{}{ + "source": "log.msg[0]", + }, + "timestamp": map[string]interface{}{ + "source": "log.ts", + "format": "RFC3339", + }, + "labels": map[string]interface{}{ + "stream": map[string]interface{}{ + "source": "test", + }, + "app": map[string]interface{}{ + "source": "component.app", + }, + "level": nil, + }, + }, + 4, + false, + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + c, err := newJSONConfig(tt.config) + if err != nil { + t.Fatalf("failed to create config: %s", err) + } + got, err := c.validate() + if (err != nil) != tt.wantErr { + t.Errorf("JSONConfig.validate() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(got) != tt.wantExprCount { + t.Errorf("expressions count = %v, want %v", len(got), tt.wantExprCount) + } + }) + } +} + +var logFixture = ` +{ + "time":"2012-11-01T22:08:41+00:00", + "app":"loki", + "component": ["parser","type"], + "level" : "WARN", + "nested" : {"child":"value"}, + "message" : "this is a log line", + "complex" : { + "log" : {"array":[{"test1":"test2"},{"test3":"test4"}],"prop":"value","prop2":"val2"} + } +} +` + +func TestJSONParser_Parse(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + config interface{} + entry string + expectedEntry string + t time.Time + expectedT time.Time + labels map[string]string + expectedLabels map[string]string + }{ + "replace all": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "level": nil, + "component": map[string]interface{}{ + "source": "component[0]", + }, + "module": map[string]interface{}{ + "source": "app", + }, + }, + "output": map[string]interface{}{ + "source": "complex.log", + }, + "timestamp": map[string]interface{}{ + "source": "time", + "format": "RFC3339", + }, + }, + logFixture, + `{"array":[{"test1":"test2"},{"test3":"test4"}],"prop":"value","prop2":"val2"}`, + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2012-11-01T22:08:41+00:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout", "level": "WARN", "component": "parser", "module": "loki"}, + }, + "invalid json": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "level": nil, + }, + }, + "ts=now log=notjson", + "ts=now log=notjson", + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout"}, + }, + "invalid timestamp skipped": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "level": nil, + }, + "timestamp": map[string]interface{}{ + "source": "time", + "format": "invalid", + }, + }, + logFixture, + logFixture, + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout", "level": "WARN"}, + }, + "invalid labels skipped": { + map[string]interface{}{ + "labels": map[string]interface{}{ + "notexisting": nil, + "level": map[string]interface{}{ + "source": "doesnotexist", + }, + }, + }, + logFixture, + logFixture, + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout"}, + }, + "invalid output skipped": { + map[string]interface{}{ + "output": map[string]interface{}{ + "source": "doesnotexist", + }, + }, + logFixture, + logFixture, + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout"}, + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + p, err := NewJson(util.Logger, tt.config) + if err != nil { + t.Fatalf("failed to create json parser: %s", err) + } + lbs := toLabelSet(tt.labels) + p.Parse(lbs, &tt.t, &tt.entry) + + assertLabels(t, tt.expectedLabels, lbs) + if tt.entry != tt.expectedEntry { + t.Fatalf("mismatch entry want: %s got:%s", tt.expectedEntry, tt.entry) + } + if tt.t.Unix() != tt.expectedT.Unix() { + t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) + } + }) + } +} + +func mustParseTime(layout, value string) time.Time { + t, err := time.Parse(layout, value) + if err != nil { + panic(err) + } + return t +} + +func toLabelSet(lbs map[string]string) model.LabelSet { + res := model.LabelSet{} + for k, v := range lbs { + res[model.LabelName(k)] = model.LabelValue(v) + } + return res +} + +func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) { + if len(expect) != len(got) { + t.Fatalf("labels are not equal in size want: %s got: %s", expect, got) + } + for k, v := range expect { + gotV, ok := got[model.LabelName(k)] + if !ok { + t.Fatalf("missing expected label key: %s", k) + } + if gotV != model.LabelValue(v) { + t.Fatalf("mismatch label value got: %s/%s want %s/%s", k, gotV, k, model.LabelValue(v)) + } + } +} diff --git a/pkg/parser/entry/parsers/parsers.go b/pkg/parser/entry/parsers/parsers.go new file mode 100644 index 0000000000000000000000000000000000000000..2f1bfbca4731e70c011d2308af8fa001a6de815e --- /dev/null +++ b/pkg/parser/entry/parsers/parsers.go @@ -0,0 +1,14 @@ +package parsers + +import ( + "time" + + "github.com/prometheus/common/model" +) + +// Parser takes an existing set of labels, timestamp and log entry and returns either a possibly mutated +// timestamp and log entry +type Parser interface { + //TODO decide on how to handle labels as a pointer or not + Parse(labels model.LabelSet, time *time.Time, entry *string) +} diff --git a/pkg/parser/entry/parsers/regex.go b/pkg/parser/entry/parsers/regex.go index 7840570f769a43ea1ed27db076503d3eb52df751..d7218d7aa7ad446bf8ff8ffb9893cfd1fd789e9d 100644 --- a/pkg/parser/entry/parsers/regex.go +++ b/pkg/parser/entry/parsers/regex.go @@ -2,10 +2,6 @@ package parsers import ( "regexp" - "time" - - "github.com/mitchellh/mapstructure" - "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/parser" ) @@ -19,14 +15,14 @@ type Regex struct { expr *regexp.Regexp } -func NewRegex(config map[interface{}]interface{}) Regex { +// func NewRegex(config map[interface{}]interface{}) Regex { - err := mapstructure.Decode(rg, &cfg2) - return Regex{ - expr: regexp.MustCompile(config.Expr), - } -} +// err := mapstructure.Decode(rg, &cfg2) +// return Regex{ +// expr: regexp.MustCompile(config.Expr), +// } +// } -func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) { +// func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) { -} +// } diff --git a/pkg/parser/entry/processor.go b/pkg/parser/entry/processor.go index c5890a9e71a7d73900c26d3ef35f51c7ccbee665..264e7e3a787ae20cd7e5f629b9e82e03aa852af5 100644 --- a/pkg/parser/entry/processor.go +++ b/pkg/parser/entry/processor.go @@ -9,13 +9,6 @@ import ( "github.com/grafana/loki/pkg/parser/entry/parsers" ) -// Parser takes an existing set of labels, timestamp and log entry and returns either a possibly mutated -// timestamp and log entry -type Parser interface { - //TODO decide on how to handle labels as a pointer or not - Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) -} - type Config struct { //FIXME do we keep the yaml the same? we have to accommodate the kube label parsing happening first (so we can act on those labels) ParserStages []map[interface{}]interface{} `yaml:"parser_stages"`