From 07f20b0db3dac84835590c19b3e5fa1fe0addaad Mon Sep 17 00:00:00 2001
From: Ed <ed@edjusted.com>
Date: Tue, 29 Jan 2019 04:02:30 -0500
Subject: [PATCH] Unit tests for Promtail (#244)

* Adding a test for target.go
Fixing some shutdown issues where tailers were stopped but didn't properly wait for the current position to be saved to file
Fixed an issue where an empty directory would lead to nothing being tailed

* adding more tests for target.go

* renaming quitComplete -> done per feedback
making PositionsFile private again, no reason for it to be public
removed unnecessary save of empty file on creation of positions file

* fixing lint errors

* cleaning up some comments
removing TODO around directories, added a test to verify behavior
some more cleanup per review

* fixing imports
---
 pkg/promtail/position.go    |   9 +-
 pkg/promtail/target.go      |  55 +++-
 pkg/promtail/target_test.go | 519 ++++++++++++++++++++++++++++++++++++
 3 files changed, 573 insertions(+), 10 deletions(-)
 create mode 100644 pkg/promtail/target_test.go

diff --git a/pkg/promtail/position.go b/pkg/promtail/position.go
index 98bbeb21..8a93d989 100644
--- a/pkg/promtail/position.go
+++ b/pkg/promtail/position.go
@@ -34,6 +34,7 @@ type Positions struct {
 	mtx       sync.Mutex
 	positions map[string]int64
 	quit      chan struct{}
+	done      chan struct{}
 }
 
 type positionsFile struct {
@@ -52,6 +53,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) {
 		cfg:       cfg,
 		positions: positions,
 		quit:      make(chan struct{}),
+		done:      make(chan struct{}),
 	}
 
 	go p.run()
@@ -61,6 +63,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) {
 // Stop the Position tracker.
 func (p *Positions) Stop() {
 	close(p.quit)
+	<-p.done
 }
 
 // Put records (asynchronously) how far we've read through a file.
@@ -85,7 +88,11 @@ func (p *Positions) Remove(path string) {
 }
 
 func (p *Positions) run() {
-	defer p.save()
+	defer func() {
+		p.save()
+		level.Debug(p.logger).Log("msg", "positions saved")
+		close(p.done)
+	}()
 
 	ticker := time.NewTicker(p.cfg.SyncPeriod)
 	for {
diff --git a/pkg/promtail/target.go b/pkg/promtail/target.go
index 5f31093b..b19efa2a 100644
--- a/pkg/promtail/target.go
+++ b/pkg/promtail/target.go
@@ -11,7 +11,7 @@ import (
 	"github.com/pkg/errors"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
-	fsnotify "gopkg.in/fsnotify.v1"
+	"gopkg.in/fsnotify.v1"
 
 	"github.com/grafana/loki/pkg/helpers"
 )
@@ -49,6 +49,7 @@ type Target struct {
 	watcher *fsnotify.Watcher
 	path    string
 	quit    chan struct{}
+	done    chan struct{}
 
 	tails map[string]*tailer
 }
@@ -75,8 +76,15 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
 	for _, p := range matches {
 		dirs[filepath.Dir(p)] = struct{}{}
 	}
+
+	// If no files exist yet watch the directory specified in the path.
+	if matches == nil {
+		dirs[filepath.Dir(path)] = struct{}{}
+	}
+
 	// watch each dir for any new files.
 	for dir := range dirs {
+		level.Debug(logger).Log("msg", "watching new directory", "directory", dir)
 		if err := watcher.Add(dir); err != nil {
 			helpers.LogError("closing watcher", watcher.Close)
 			return nil, errors.Wrap(err, "watcher.Add")
@@ -90,6 +98,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
 		handler:   addLabelsMiddleware(labels).Wrap(handler),
 		positions: positions,
 		quit:      make(chan struct{}),
+		done:      make(chan struct{}),
 		tails:     map[string]*tailer{},
 	}
 
@@ -120,6 +129,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
 // Stop the target.
 func (t *Target) Stop() {
 	close(t.quit)
+	<-t.done
 }
 
 func (t *Target) run() {
@@ -128,6 +138,10 @@ func (t *Target) run() {
 		for _, v := range t.tails {
 			helpers.LogError("stopping tailer", v.stop)
 		}
+		//Save positions
+		t.positions.Stop()
+		level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
+		close(t.done)
 	}()
 
 	for {
@@ -155,6 +169,7 @@ func (t *Target) run() {
 					continue
 				}
 
+				level.Debug(t.logger).Log("msg", "tailing new file", "filename", event.Name)
 				t.tails[event.Name] = tailer
 
 			case fsnotify.Remove:
@@ -191,6 +206,9 @@ type tailer struct {
 
 	path string
 	tail *tail.Tail
+
+	quit chan struct{}
+	done chan struct{}
 }
 
 func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, path string) (*tailer, error) {
@@ -212,31 +230,36 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa
 
 		path: path,
 		tail: tail,
+		quit: make(chan struct{}),
+		done: make(chan struct{}),
 	}
 	go tailer.run()
 	return tailer, nil
 }
 
 func (t *tailer) run() {
-	defer func() {
-		level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path)
-	}()
-
 	level.Info(t.logger).Log("msg", "start tailing file", "filename", t.path)
 	positionSyncPeriod := t.positions.cfg.SyncPeriod
 	positionWait := time.NewTicker(positionSyncPeriod)
-	defer positionWait.Stop()
+
+	defer func() {
+		level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path)
+		positionWait.Stop()
+		err := t.markPosition()
+		if err != nil {
+			level.Error(t.logger).Log("msg", "error getting tail position", "error", err)
+		}
+		close(t.done)
+	}()
 
 	for {
 		select {
-
 		case <-positionWait.C:
-			pos, err := t.tail.Tell()
+			err := t.markPosition()
 			if err != nil {
 				level.Error(t.logger).Log("msg", "error getting tail position", "error", err)
 				continue
 			}
-			t.positions.Put(t.path, pos)
 
 		case line, ok := <-t.tail.Lines:
 			if !ok {
@@ -252,11 +275,25 @@ func (t *tailer) run() {
 			if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil {
 				level.Error(t.logger).Log("msg", "error handling line", "error", err)
 			}
+		case <-t.quit:
+			return
 		}
 	}
 }
 
+func (t *tailer) markPosition() error {
+	pos, err := t.tail.Tell()
+	if err != nil {
+		return err
+	}
+	level.Debug(t.logger).Log("path", t.path, "current_position", pos)
+	t.positions.Put(t.path, pos)
+	return nil
+}
+
 func (t *tailer) stop() error {
+	close(t.quit)
+	<-t.done
 	return t.tail.Stop()
 }
 
diff --git a/pkg/promtail/target_test.go b/pkg/promtail/target_test.go
new file mode 100644
index 00000000..22a88049
--- /dev/null
+++ b/pkg/promtail/target_test.go
@@ -0,0 +1,519 @@
+package promtail
+
+import (
+	"io/ioutil"
+	"math/rand"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"github.com/go-kit/kit/log/level"
+
+	"github.com/go-kit/kit/log"
+	"github.com/prometheus/common/model"
+	"gopkg.in/yaml.v2"
+)
+
+func TestLongSyncDelayStillSavesCorrectPosition(t *testing.T) {
+	w := log.NewSyncWriter(os.Stderr)
+	logger := log.NewLogfmtLogger(w)
+
+	initRandom()
+	dirName := "/tmp/" + randName()
+	positionsFileName := dirName + "/positions.yml"
+	logFile := dirName + "/test.log"
+
+	err := os.MkdirAll(dirName, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer os.RemoveAll(dirName)
+
+	// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
+	// everything saved was done through channel notifications when target.stop() was called.
+	positions, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFileName,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	client := &TestClient{
+		log:      logger,
+		messages: make([]string, 0),
+	}
+
+	target, err := NewTarget(logger, client, positions, logFile, nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	f, err := os.Create(logFile)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target.Stop()
+
+	buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
+	if err != nil {
+		t.Error("Expected to find a positions file but did not", err)
+		return
+	}
+	var p positionsFile
+	if err := yaml.UnmarshalStrict(buf, &p); err != nil {
+		t.Error("Failed to parse positions file:", err)
+		return
+	}
+
+	// Assert the position value is in the correct spot.
+	if val, ok := p.Positions[logFile]; ok {
+		if val != 50 {
+			t.Error("Incorrect position found, expected 50, found", val)
+		}
+	} else {
+		t.Error("Positions file did not contain any data for our test log file")
+	}
+
+	// Assert the number of messages the handler received is correct.
+	if len(client.messages) != 10 {
+		t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages))
+	}
+
+	// Spot check one of the messages.
+	if client.messages[0] != "test" {
+		t.Error("Expected first log message to be 'test' but was", client.messages[0])
+	}
+
+}
+
+func TestWatchEntireDirectory(t *testing.T) {
+	w := log.NewSyncWriter(os.Stderr)
+	logger := log.NewLogfmtLogger(w)
+
+	initRandom()
+	dirName := "/tmp/" + randName()
+	positionsFileName := dirName + "/positions.yml"
+	logFileDir := dirName + "/logdir/"
+
+	err := os.MkdirAll(dirName, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	err = os.MkdirAll(logFileDir, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer os.RemoveAll(dirName)
+
+	// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
+	// everything saved was done through channel notifications when target.stop() was called.
+	positions, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFileName,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	client := &TestClient{
+		log:      logger,
+		messages: make([]string, 0),
+	}
+
+	target, err := NewTarget(logger, client, positions, logFileDir+"*", nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	f, err := os.Create(logFileDir + "test.log")
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target.Stop()
+
+	buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
+	if err != nil {
+		t.Error("Expected to find a positions file but did not", err)
+		return
+	}
+	var p positionsFile
+	if err := yaml.UnmarshalStrict(buf, &p); err != nil {
+		t.Error("Failed to parse positions file:", err)
+		return
+	}
+
+	// Assert the position value is in the correct spot.
+	if val, ok := p.Positions[logFileDir+"test.log"]; ok {
+		if val != 50 {
+			t.Error("Incorrect position found, expected 50, found", val)
+		}
+	} else {
+		t.Error("Positions file did not contain any data for our test log file")
+	}
+
+	// Assert the number of messages the handler received is correct.
+	if len(client.messages) != 10 {
+		t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages))
+	}
+
+	// Spot check one of the messages.
+	if client.messages[0] != "test" {
+		t.Error("Expected first log message to be 'test' but was", client.messages[0])
+	}
+
+}
+
+func TestFileRolls(t *testing.T) {
+	w := log.NewSyncWriter(os.Stderr)
+	logger := log.NewLogfmtLogger(w)
+
+	initRandom()
+	dirName := "/tmp/" + randName()
+	positionsFile := dirName + "/positions.yml"
+	logFile := dirName + "/test.log"
+
+	err := os.MkdirAll(dirName, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer os.RemoveAll(dirName)
+
+	// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
+	// everything saved was done through channel notifications when target.stop() was called.
+	positions, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFile,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	client := &TestClient{
+		log:      logger,
+		messages: make([]string, 0),
+	}
+
+	target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	f, err := os.Create(logFile)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test1\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	// Rename the log file to something not in the pattern, then create a new file with the same name.
+	err = os.Rename(logFile, dirName+"/test.log.1")
+	if err != nil {
+		t.Error("Failed to rename log file for test", err)
+		return
+	}
+	f, err = os.Create(logFile)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test2\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target.Stop()
+
+	if len(client.messages) != 20 {
+		t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
+	}
+
+	// Spot check one of the messages.
+	if client.messages[0] != "test1" {
+		t.Error("Expected first log message to be 'test1' but was", client.messages[0])
+	}
+
+	// Spot check the first message from the second file.
+	if client.messages[10] != "test2" {
+		t.Error("Expected first log message to be 'test2' but was", client.messages[10])
+	}
+}
+
+func TestResumesWhereLeftOff(t *testing.T) {
+	w := log.NewSyncWriter(os.Stderr)
+	logger := log.NewLogfmtLogger(w)
+
+	initRandom()
+	dirName := "/tmp/" + randName()
+	positionsFileName := dirName + "/positions.yml"
+	logFile := dirName + "/test.log"
+
+	err := os.MkdirAll(dirName, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer os.RemoveAll(dirName)
+
+	// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
+	// everything saved was done through channel notifications when target.stop() was called.
+	positions, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFileName,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	client := &TestClient{
+		log:      logger,
+		messages: make([]string, 0),
+	}
+
+	target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	f, err := os.Create(logFile)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test1\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target.Stop()
+
+	// Create another positions (so that it loads from the previously saved positions file).
+	positions2, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFileName,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Create a new target, keep the same client so we can track what was sent through the handler.
+	target2, err := NewTarget(logger, client, positions2, dirName+"/*.log", nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f.WriteString("test2\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target2.Stop()
+
+	if len(client.messages) != 20 {
+		t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
+	}
+
+	// Spot check one of the messages.
+	if client.messages[0] != "test1" {
+		t.Error("Expected first log message to be 'test1' but was", client.messages[0])
+	}
+
+	// Spot check the first message from the second file.
+	if client.messages[10] != "test2" {
+		t.Error("Expected first log message to be 'test2' but was", client.messages[10])
+	}
+}
+
+func TestGlobWithMultipleFiles(t *testing.T) {
+	w := log.NewSyncWriter(os.Stderr)
+	logger := log.NewLogfmtLogger(w)
+
+	initRandom()
+	dirName := "/tmp/" + randName()
+	positionsFileName := dirName + "/positions.yml"
+	logFile1 := dirName + "/test.log"
+	logFile2 := dirName + "/dirt.log"
+
+	err := os.MkdirAll(dirName, 0750)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	defer os.RemoveAll(dirName)
+
+	// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
+	// everything saved was done through channel notifications when target.stop() was called.
+	positions, err := NewPositions(logger, PositionsConfig{
+		SyncPeriod:    10 * time.Second,
+		PositionsFile: positionsFileName,
+	})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	client := &TestClient{
+		log:      logger,
+		messages: make([]string, 0),
+	}
+
+	target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	f1, err := os.Create(logFile1)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	f2, err := os.Create(logFile2)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	for i := 0; i < 10; i++ {
+		_, err = f1.WriteString("test1\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+		_, err = f2.WriteString("dirt1\n")
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		time.Sleep(1 * time.Millisecond)
+	}
+
+	target.Stop()
+
+	buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
+	if err != nil {
+		t.Error("Expected to find a positions file but did not", err)
+		return
+	}
+	var p positionsFile
+	if err := yaml.UnmarshalStrict(buf, &p); err != nil {
+		t.Error("Failed to parse positions file:", err)
+		return
+	}
+
+	// Assert the position value is in the correct spot.
+	if val, ok := p.Positions[logFile1]; ok {
+		if val != 60 {
+			t.Error("Incorrect position found for file 1, expected 60, found", val)
+		}
+	} else {
+		t.Error("Positions file did not contain any data for our test log file")
+	}
+	if val, ok := p.Positions[logFile2]; ok {
+		if val != 60 {
+			t.Error("Incorrect position found for file 2, expected 60, found", val)
+		}
+	} else {
+		t.Error("Positions file did not contain any data for our test log file")
+	}
+
+	// Assert the number of messages the handler received is correct.
+	if len(client.messages) != 20 {
+		t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
+	}
+
+	// Spot check one of the messages, the first message should be from the first file because we wrote that first.
+	if client.messages[0] != "test1" {
+		t.Error("Expected first log message to be 'test1' but was", client.messages[0])
+	}
+
+	// Spot check the second message, it should be from the second file.
+	if client.messages[1] != "dirt1" {
+		t.Error("Expected first log message to be 'test2' but was", client.messages[1])
+	}
+}
+
+type TestClient struct {
+	log      log.Logger
+	messages []string
+}
+
+func (c *TestClient) Handle(ls model.LabelSet, t time.Time, s string) error {
+	c.messages = append(c.messages, s)
+	level.Debug(c.log).Log("msg", "received log", "log", s)
+	return nil
+}
+
+func initRandom() {
+	rand.Seed(time.Now().UnixNano())
+}
+
+var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
+
+func randName() string {
+	b := make([]rune, 10)
+	for i := range b {
+		b[i] = letters[rand.Intn(len(letters))]
+	}
+	return string(b)
+}
-- 
GitLab