diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go index c931e7146e6225ff9fa249548bcc633b5988bcd0..cec7cf997ecfc03bb8b96933b9937f3ec61b7e15 100644 --- a/pkg/logentry/pipeline.go +++ b/pkg/logentry/pipeline.go @@ -10,12 +10,15 @@ import ( "github.com/prometheus/common/model" ) +// PipelineStages contains configuration for each stage within a pipeline type PipelineStages []interface{} +// Pipeline pass down a log entry to each stage for mutation. type Pipeline struct { stages []stages.Stage } +// NewPipeline creates a new log entry pipeline from a configuration func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { st := []stages.Stage{} for _, s := range stgs { @@ -33,7 +36,7 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { } switch name { case "json": - json, err := stages.NewJson(log, config) + json, err := stages.NewJSON(log, config) if err != nil { return nil, errors.Wrap(err, "invalid json stage config") } @@ -47,6 +50,7 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { }, nil } +// Process mutates an entry and its metadata by using multiple configure stage. func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string) { //debug log labels, time, and string for _, stage := range p.stages { @@ -55,6 +59,7 @@ func (p *Pipeline) Process(labels model.LabelSet, time *time.Time, entry *string } } +// Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error { p.Process(labels, ×tamp, &line) diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index f04eb522f24bfece623c3317d0371b50d37e0edd..49d28af5253ebade74eed7a9ac2d99e0a90a659f 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -93,13 +93,14 @@ func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) { return expressions, nil } -type JSONParser struct { +type jsonStage struct { cfg *JSONConfig expressions map[string]*jmespath.JMESPath logger log.Logger } -func NewJson(logger log.Logger, config interface{}) (Stage, error) { +// NewJSON creates a new json stage from a config. +func NewJSON(logger log.Logger, config interface{}) (Stage, error) { cfg, err := newJSONConfig(config) if err != nil { return nil, err @@ -108,53 +109,54 @@ func NewJson(logger log.Logger, config interface{}) (Stage, error) { if err != nil { return nil, err } - return &JSONParser{ + return &jsonStage{ cfg: cfg, expressions: expressions, logger: log.With(logger, "component", "parser", "type", "json"), }, nil } -func (j *JSONParser) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) { +func (j *jsonStage) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) { if expr == nil { result, ok = data[fallback].(string) if !ok { - level.Warn(j.logger).Log("msg", "field is not a string", "field", fallback) + level.Debug(j.logger).Log("msg", "field is not a string", "field", fallback) } } else { var searchResult interface{} searchResult, ok = j.getJSONValue(expr, data) if !ok { - level.Warn(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) + level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) return } result, ok = searchResult.(string) if !ok { - level.Warn(j.logger).Log("msg", "search result is not a string", "expr", *expr) + level.Debug(j.logger).Log("msg", "search result is not a string", "expr", *expr) } } return } -func (j *JSONParser) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) { +func (j *jsonStage) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) { var err error ok = true result, err = j.expressions[*expr].Search(data) if err != nil { - level.Warn(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) + level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr) ok = false return } return } -func (j *JSONParser) Process(labels model.LabelSet, t *time.Time, entry *string) { +// Process implement a pipeline stage +func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) { if entry == nil { - level.Warn(j.logger).Log("msg", "cannot parse a nil entry") + level.Debug(j.logger).Log("msg", "cannot parse a nil entry") } var data map[string]interface{} if err := json.Unmarshal([]byte(*entry), &data); err != nil { - level.Warn(j.logger).Log("msg", "could not unmarshal json", "err", err) + level.Debug(j.logger).Log("msg", "could not unmarshal json", "err", err) return } @@ -163,7 +165,7 @@ func (j *JSONParser) Process(labels model.LabelSet, t *time.Time, entry *string) if ts, ok := j.getJSONString(j.cfg.Timestamp.Source, "timestamp", data); ok { parsedTs, err := time.Parse(j.cfg.Timestamp.Format, ts) if err != nil { - level.Warn(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts) + level.Debug(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts) } else { *t = parsedTs } @@ -183,7 +185,7 @@ func (j *JSONParser) Process(labels model.LabelSet, t *time.Time, entry *string) labelValue := model.LabelValue(lValue) // seems impossible as the json.Unmarshal would yield an error first. if !labelValue.IsValid() { - level.Warn(j.logger).Log("msg", "invalid label value parsed", "value", labelValue) + level.Debug(j.logger).Log("msg", "invalid label value parsed", "value", labelValue) continue } labels[model.LabelName(lName)] = labelValue @@ -192,9 +194,13 @@ func (j *JSONParser) Process(labels model.LabelSet, t *time.Time, entry *string) // parsing output if j.cfg.Output != nil { if jsonObj, ok := j.getJSONValue(j.cfg.Output.Source, data); ok && jsonObj != nil { + if s, ok := jsonObj.(string); ok { + *entry = s + return + } b, err := json.Marshal(jsonObj) if err != nil { - level.Warn(j.logger).Log("msg", "could not marshal output value", "err", err) + level.Debug(j.logger).Log("msg", "could not marshal output value", "err", err) return } *entry = string(b) diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index 5c1a64ba999a6c619151d8878d093b834414d804..40d3aa5dcf2ef94530f871267e0fa3805855f608 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -297,16 +297,29 @@ func TestJSONParser_Parse(t *testing.T) { map[string]string{"stream": "stdout"}, map[string]string{"stream": "stdout"}, }, + "string output": { + map[string]interface{}{ + "output": map[string]interface{}{ + "source": "message", + }, + }, + logFixture, + "this is a log line", + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + mustParseTime(time.RFC3339, "2019-03-28T11:29:10+07:00"), + map[string]string{"stream": "stdout"}, + map[string]string{"stream": "stdout"}, + }, } for tName, tt := range tests { tt := tt t.Run(tName, func(t *testing.T) { - p, err := NewJson(util.Logger, tt.config) + p, err := NewJSON(util.Logger, tt.config) if err != nil { t.Fatalf("failed to create json parser: %s", err) } lbs := toLabelSet(tt.labels) - p.Parse(lbs, &tt.t, &tt.entry) + p.Process(lbs, &tt.t, &tt.entry) assertLabels(t, tt.expectedLabels, lbs) if tt.entry != tt.expectedEntry {