Skip to content
Snippets Groups Projects
Commit da4964dd authored by Aditya C S's avatar Aditya C S Committed by Ed
Browse files

Add pipeline unit testing to promtail

parent e75acd79
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ import (
"net/http"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"time"
......@@ -25,6 +26,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/config"
......@@ -62,9 +64,10 @@ func TestPromtail(t *testing.T) {
}
handler := &testServerHandler{
receivedMap: map[string][]logproto.Entry{},
recMtx: sync.Mutex{},
t: t,
receivedMap: map[string][]logproto.Entry{},
receivedLabels: map[string][]labels.Labels{},
recMtx: sync.Mutex{},
t: t,
}
http.Handle("/api/prom/push", handler)
defer func() {
......@@ -121,6 +124,36 @@ func TestPromtail(t *testing.T) {
prefix4 := "sub"
expectedCounts[logFile4] = subdirSingleFile(t, logFile4, prefix4)
logFile5 := testDir + "/testPipeline.log"
entries := []string{
`{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`,
`{"log":"11.11.11.12 - - [19/May/2015:04:05:16 -0500] \"POST /blog HTTP/1.1\" 200 10975 \"http://grafana.com/test/\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stdout","time":"2019-04-30T02:12:42.8443515Z"}`,
}
expectedCounts[logFile5] = pipelineFile(t, logFile5, entries)
expectedEntries := []string{
`11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`,
`11.11.11.12 - - [19/May/2015:04:05:16 -0500] "POST /blog HTTP/1.1" 200 10975 "http://grafana.com/test/" "Mozilla/5.0 (Windows NT 6.1; WOW64) Gecko/20091221 Firefox/3.5.7 GTB6"`,
}
expectedLabels := []labels.Labels{}
expectedLabels = append(expectedLabels, labels.Labels{
labels.Label{Name: "action", Value: "GET"},
labels.Label{Name: "filename", Value: dirName + "/logs/testPipeline.log"},
labels.Label{Name: "job", Value: "varlogs"},
labels.Label{Name: "localhost", Value: ""},
labels.Label{Name: "match", Value: "true"},
labels.Label{Name: "stream", Value: "stderr"},
})
expectedLabels = append(expectedLabels, labels.Labels{
labels.Label{Name: "action", Value: "POST"},
labels.Label{Name: "filename", Value: dirName + "/logs/testPipeline.log"},
labels.Label{Name: "job", Value: "varlogs"},
labels.Label{Name: "localhost", Value: ""},
labels.Label{Name: "match", Value: "true"},
labels.Label{Name: "stream", Value: "stdout"},
})
// Wait for all lines to be received.
if err := waitForEntries(20, handler, expectedCounts); err != nil {
t.Fatal("Timed out waiting for log entries: ", err)
......@@ -141,11 +174,11 @@ func TestPromtail(t *testing.T) {
p.Shutdown()
// Verify.
verifyFile(t, expectedCounts[logFile1], prefix1, handler.receivedMap[logFile1])
verifyFile(t, expectedCounts[logFile2], prefix2, handler.receivedMap[logFile2])
verifyFile(t, expectedCounts[logFile3], prefix3, handler.receivedMap[logFile3])
verifyFile(t, expectedCounts[logFile4], prefix4, handler.receivedMap[logFile4])
verifyPipeline(t, expectedCounts[logFile5], expectedEntries, handler.receivedMap[logFile5], handler.receivedLabels[logFile5], expectedLabels)
if len(handler.receivedMap) != len(expectedCounts) {
t.Error("Somehow we ended up tailing more files than we were supposed to, this is likely a bug")
......@@ -188,6 +221,21 @@ func verifyFile(t *testing.T, expected int, prefix string, entries []logproto.En
}
}
func verifyPipeline(t *testing.T, expected int, expectedEntries []string, entries []logproto.Entry, labels []labels.Labels, expectedLabels []labels.Labels) {
for i := 0; i < expected; i++ {
if !reflect.DeepEqual(labels[i], expectedLabels[i]) {
t.Errorf("Did not receive expected labels, expected %s, received %s", labels[i], expectedLabels[i])
}
}
for i := 0; i < expected; i++ {
if entries[i].Line != expectedEntries[i] {
t.Errorf("Did not receive expected log entry, expected %s, received %s", expectedEntries[i], entries[i].Line)
}
}
}
func verifyMetricAbsent(t *testing.T, metrics map[string]float64, metric string, label string) {
if _, ok := metrics[label]; ok {
t.Error("Found metric", metric, "with label", label, "which was not expected, "+
......@@ -222,6 +270,24 @@ func singleFile(t *testing.T, filename string, prefix string) int {
return entries
}
func pipelineFile(t *testing.T, filename string, entries []string) int {
f, err := os.Create(filename)
if err != nil {
t.Fatal(err)
}
for _, entry := range entries {
line := fmt.Sprintf("%s\n", entry)
_, err = f.WriteString(line)
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Millisecond)
}
return len(entries)
}
func fileRoll(t *testing.T, filename string, prefix string) int {
f, err := os.Create(filename)
if err != nil {
......@@ -361,9 +427,10 @@ func waitForEntries(timeoutSec int, handler *testServerHandler, expectedCounts m
}
type testServerHandler struct {
receivedMap map[string][]logproto.Entry
recMtx sync.Mutex
t *testing.T
receivedMap map[string][]logproto.Entry
receivedLabels map[string][]labels.Labels
recMtx sync.Mutex
t *testing.T
}
func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
......@@ -374,13 +441,13 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
h.recMtx.Lock()
for _, s := range req.Streams {
labels, err := promql.ParseMetric(s.Labels)
parsedLabels, err := promql.ParseMetric(s.Labels)
if err != nil {
h.t.Error("Failed to parse incoming labels", err)
return
}
file := ""
for _, label := range labels {
for _, label := range parsedLabels {
if label.Name == targets.FilenameLabel {
file = label.Value
continue
......@@ -390,12 +457,21 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.t.Error("Expected to find a label with name `filename` but did not!")
return
}
if _, ok := h.receivedMap[file]; ok {
h.receivedMap[file] = append(h.receivedMap[file], s.Entries...)
} else {
h.receivedMap[file] = s.Entries
}
if _, ok := h.receivedLabels[file]; ok {
h.receivedLabels[file] = append(h.receivedLabels[file], parsedLabels)
} else {
h.receivedLabels[file] = []labels.Labels{parsedLabels}
}
}
h.recMtx.Unlock()
}
......@@ -472,12 +548,44 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string)
cfg.PositionsConfig.SyncPeriod = 100 * time.Millisecond
cfg.PositionsConfig.PositionsFile = positionsFileName
pipeline := stages.PipelineStages{
stages.PipelineStage{
stages.StageTypeMatch: stages.MatcherConfig{
PipelineName: nil,
Selector: "{match=\"true\"}",
Stages: stages.PipelineStages{
stages.PipelineStage{
stages.StageTypeDocker: nil,
},
stages.PipelineStage{
stages.StageTypeRegex: stages.RegexConfig{
Expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
Source: nil,
},
},
stages.PipelineStage{
stages.StageTypeTimestamp: stages.TimestampConfig{
Source: "timestamp",
Format: "02/Jan/2006:15:04:05 -0700",
},
},
stages.PipelineStage{
stages.StageTypeLabel: stages.LabelsConfig{
"action": nil,
},
},
},
},
},
}
targetGroup := targetgroup.Group{
Targets: []model.LabelSet{{
"localhost": "",
}},
Labels: model.LabelSet{
"job": "varlogs",
"match": "true",
"__path__": model.LabelValue(logDirName + "/**/*.log"),
},
Source: "",
......@@ -492,6 +600,7 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string)
scrapeConfig := scrape.Config{
JobName: "",
EntryParser: api.Raw,
PipelineStages: pipeline,
RelabelConfigs: nil,
ServiceDiscoveryConfig: serviceConfig,
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment