diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 9936500fc8fdf4b2f38a6a538f8ce85079172d9f..b261db624074dde7cabda9d319833d9a8a0a9838 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -3,6 +3,8 @@ package promtail import ( "errors" "fmt" + "io" + "io/ioutil" "math/rand" "net/http" "os" @@ -18,6 +20,9 @@ import ( "github.com/prometheus/common/model" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/stretchr/testify/assert" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/parser" @@ -110,9 +115,21 @@ func TestPromtail(t *testing.T) { // Wait for all lines to be received. if err := waitForEntries(20, handler, expectedCounts); err != nil { - t.Fatal("Timed out waiting all log entries to be sent, still waiting for ", err) + t.Fatal("Timed out waiting for log entries: ", err) } + // Delete one of the log files so we can verify metrics are clean up + err = os.Remove(logFile1) + if err != nil { + t.Fatal("Could not delete a log file to verify metrics are removed: ", err) + } + + // Sync period is 100ms in tests, need to wait for at least one sync period for tailer to be cleaned up + <-time.After(150 * time.Millisecond) + + //Pull out some prometheus metrics before shutting down + metricsBytes, contentType := getPromMetrics(t) + p.Shutdown() // Verify. @@ -126,6 +143,21 @@ func TestPromtail(t *testing.T) { t.Error("Somehow we ended up tailing more files than we were supposed to, this is likely a bug") } + readBytesMetrics := parsePromMetrics(t, metricsBytes, contentType, "promtail_read_bytes_total", "path") + fileBytesMetrics := parsePromMetrics(t, metricsBytes, contentType, "promtail_file_bytes_total", "path") + + verifyMetricAbsent(t, readBytesMetrics, "promtail_read_bytes_total", logFile1) + verifyMetricAbsent(t, fileBytesMetrics, "promtail_file_bytes_total", logFile1) + + verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile2, 800) + verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile2, 800) + + verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile3, 700) + verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile3, 700) + + verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile4, 590) + verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile4, 590) + } func createStartupFile(t *testing.T, filename string) int { @@ -148,6 +180,22 @@ func verifyFile(t *testing.T, expected int, prefix string, entries []logproto.En } } +func verifyMetricAbsent(t *testing.T, metrics map[string]float64, metric string, label string) { + if _, ok := metrics[label]; ok { + t.Error("Found metric", metric, "with label", label, "which was not expected, "+ + "this metric should not be present") + } +} + +func verifyMetric(t *testing.T, metrics map[string]float64, metric string, label string, expected float64) { + if _, ok := metrics[label]; !ok { + t.Error("Expected to find metric ", metric, " with", label, "but it was not present") + } else { + actualBytes := metrics[label] + assert.Equal(t, expected, actualBytes, "found incorrect value for metric %s and label %s", metric, label) + } +} + func singleFile(t *testing.T, filename string, prefix string) int { f, err := os.Create(filename) if err != nil { @@ -294,9 +342,12 @@ func waitForEntries(timeoutSec int, handler *testServerHandler, expectedCounts m for file, expectedCount := range expectedCounts { if rcvd, ok := handler.receivedMap[file]; !ok || len(rcvd) != expectedCount { waiting = waiting + " " + file + for _, e := range rcvd { + level.Info(util.Logger).Log("file", file, "entry", e.Line) + } } } - return errors.New("Did not receive the correct number of logs within timeout, still waiting for logs from" + waiting) + return errors.New("still waiting for logs from" + waiting) } return nil } @@ -340,6 +391,55 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.recMtx.Unlock() } +func getPromMetrics(t *testing.T) ([]byte, string) { + resp, err := http.Get("http://localhost:80/metrics") + if err != nil { + t.Fatal("Could not query metrics endpoint", err) + } + + if resp.StatusCode != http.StatusOK { + t.Fatal("Received a non 200 status code from /metrics endpoint", resp.StatusCode) + } + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatal("Error reading response body from /metrics endpoint", err) + } + ct := resp.Header.Get("Content-Type") + return b, ct +} + +func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 { + rb := map[string]float64{} + + pr := textparse.New(bytes, contentType) + for { + et, err := pr.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal("Failed to parse prometheus metrics", err) + } + switch et { + case textparse.EntrySeries: + var res labels.Labels + _, _, v := pr.Series() + pr.Metric(&res) + switch res.Get(labels.MetricName) { + case metricName: + rb[res.Get(label)] = v + continue + default: + continue + } + default: + continue + } + } + return rb +} + func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) config.Config { var clientURL flagext.URLValue err := clientURL.Set("http://localhost:3100/api/prom/push") diff --git a/pkg/promtail/targets/tailer.go b/pkg/promtail/targets/tailer.go index 804ef1fe750bf1fa0faf20173bab24797211a398..115e38dc795d8ef3c1039c12d0000042b2dad115 100644 --- a/pkg/promtail/targets/tailer.go +++ b/pkg/promtail/targets/tailer.go @@ -125,6 +125,9 @@ func (t *tailer) stop() error { close(t.quit) <-t.done filesActive.Add(-1.) + // When we stop tailing the file, also un-export metrics related to the file + readBytes.DeleteLabelValues(t.path) + totalBytes.DeleteLabelValues(t.path) level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) return err }