From 45a7556da20f635c45370bc8a2261a15b4c7a8d8 Mon Sep 17 00:00:00 2001
From: Edward Welch <edward.welch@grafana.com>
Date: Mon, 13 May 2019 23:32:43 -0400
Subject: [PATCH] improving metrics improving tests config via flags added
 ksonnet config

---
 cmd/loki-canary/main.go                       | 53 +++++++++++++--
 go.mod                                        |  1 +
 pkg/comparator/comparator.go                  | 42 +++++++++---
 pkg/comparator/comparator_test.go             | 66 ++++++++++++++++++-
 pkg/reader/logproto.pb.go                     | 21 +++---
 .../ksonnet/loki-canary/jsonnetfile.json      | 14 ++++
 .../ksonnet/loki-canary/loki-canary.libsonnet | 46 +++++++++++++
 7 files changed, 217 insertions(+), 26 deletions(-)
 create mode 100644 production/ksonnet/loki-canary/jsonnetfile.json
 create mode 100644 production/ksonnet/loki-canary/loki-canary.libsonnet

diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go
index 40dab0d4..442fcc39 100644
--- a/cmd/loki-canary/main.go
+++ b/cmd/loki-canary/main.go
@@ -1,11 +1,14 @@
 package main
 
 import (
+	"flag"
 	"fmt"
+	"io/ioutil"
 	"net/http"
 	"net/url"
 	"os"
 	"os/signal"
+	"strconv"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -17,22 +20,60 @@ import (
 
 func main() {
 
-	c := comparator.NewComparator(os.Stderr, 1*time.Minute, 1*time.Second)
+	lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector")
+	lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector")
+	usePodName := flag.Bool("usepod", false, "If true, loki-canary will read the pod name from /etc/loki-canary/pod_name as the unique label value")
+	port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
+	addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
+	tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
+	user := flag.String("user", "", "Loki username")
+	pass := flag.String("pass", "", "Loki password")
 
-	w := writer.NewWriter(os.Stdout, c, 10*time.Millisecond, 1024)
+	interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
+	size := flag.Int("size", 100, "Size in bytes of each log line")
+	wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost")
+	flag.Parse()
+
+	val := *lVal
+	if *usePodName {
+		data, err := ioutil.ReadFile("/etc/loki-canary/name")
+		if err != nil {
+			panic(err)
+		}
+		val = string(data)
+	}
+
+	if *addr == "" {
+		panic("Must specify a Loki address with -addr")
+	}
+
+	var ui *url.Userinfo
+	if *user != "" {
+		ui = url.UserPassword(*user, *pass)
+	}
+
+	scheme := "ws"
+	if *tls {
+		scheme = "wss"
+	}
 
 	u := url.URL{
-		Scheme:   "ws",
-		Host:     "loki:3100",
+		Scheme:   scheme,
+		Host:     *addr,
+		User:     ui,
 		Path:     "/api/prom/tail",
-		RawQuery: "query=" + url.QueryEscape("{name=\"loki-canary\",stream=\"stdout\"}"),
+		RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", *lName, val)),
 	}
 
+	_, _ = fmt.Fprintf(os.Stderr, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), *lName, val)
+
+	c := comparator.NewComparator(os.Stderr, *wait, 1*time.Second)
+	w := writer.NewWriter(os.Stdout, c, *interval, *size)
 	r := reader.NewReader(os.Stderr, c, u, "", "")
 
 	http.Handle("/metrics", promhttp.Handler())
 	go func() {
-		err := http.ListenAndServe(":2112", nil)
+		err := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
 		if err != nil {
 			panic(err)
 		}
diff --git a/go.mod b/go.mod
index 4703d635..8e45d829 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
 	github.com/golang/protobuf v1.3.1 // indirect
 	github.com/gorilla/websocket v1.4.0
 	github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
+	github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f
 	github.com/prometheus/common v0.3.0 // indirect
 	github.com/stretchr/testify v1.3.0
 	golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect
diff --git a/pkg/comparator/comparator.go b/pkg/comparator/comparator.go
index 7038acae..c349752a 100644
--- a/pkg/comparator/comparator.go
+++ b/pkg/comparator/comparator.go
@@ -16,13 +16,31 @@ const (
 )
 
 var (
-	outOfOrderEntry = promauto.NewCounter(prometheus.CounterOpts{
-		Name: "out_of_order_entry",
-		Help: "The total number of processed events",
+	totalEntries = promauto.NewCounter(prometheus.CounterOpts{
+		Namespace: "loki_canary",
+		Name:      "total_entries",
+		Help:      "counts log entries written to the file",
 	})
-	missingEntry = promauto.NewCounter(prometheus.CounterOpts{
-		Name: "missing_entry",
-		Help: "The total number of processed events",
+	outOfOrderEntries = promauto.NewCounter(prometheus.CounterOpts{
+		Namespace: "loki_canary",
+		Name:      "out_of_order_entries",
+		Help:      "counts log entries received with a timestamp more recent than the others in the queue",
+	})
+	missingEntries = promauto.NewCounter(prometheus.CounterOpts{
+		Namespace: "loki_canary",
+		Name:      "missing_entries",
+		Help:      "counts log entries not received within the maxWait duration and is reported as missing",
+	})
+	unexpectedEntries = promauto.NewCounter(prometheus.CounterOpts{
+		Namespace: "loki_canary",
+		Name:      "unexpected_entries",
+		Help:      "counts a log entry received which was not expected (e.g. duplicate, received after reported missing)",
+	})
+	responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "loki_canary",
+		Name:      "response_latency",
+		Help:      "is how long it takes for log lines to be returned from Loki in seconds.",
+		Buckets:   []float64{0.5, 1, 2.5, 5, 10, 30, 60},
 	})
 )
 
@@ -60,21 +78,26 @@ func (c *Comparator) EntrySent(time time.Time) {
 	c.entMtx.Lock()
 	defer c.entMtx.Unlock()
 	c.entries = append(c.entries, &time)
+	totalEntries.Inc()
 }
 
+// EntryReceived removes the received entry from the buffer if it exists, reports on out of order entries received
 func (c *Comparator) EntryReceived(ts time.Time) {
 	c.entMtx.Lock()
 	defer c.entMtx.Unlock()
 
 	// Output index
 	k := 0
+	matched := false
 	for i, e := range c.entries {
 		if ts.Equal(*e) {
+			matched = true
 			// If this isn't the first item in the list we received it out of order
 			if i != 0 {
-				outOfOrderEntry.Inc()
+				outOfOrderEntries.Inc()
 				_, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
 			}
+			responseLatency.Observe(time.Now().Sub(ts).Seconds())
 			// Do not increment output index, effectively causing this element to be dropped
 		} else {
 			// If the current index doesn't match the output index, update the array with the correct position
@@ -84,6 +107,9 @@ func (c *Comparator) EntryReceived(ts time.Time) {
 			k++
 		}
 	}
+	if !matched {
+		unexpectedEntries.Inc()
+	}
 	// Nil out the pointers to any trailing elements which were removed from the slice
 	for i := k; i < len(c.entries); i++ {
 		c.entries[i] = nil // or the zero value of T
@@ -120,7 +146,7 @@ func (c *Comparator) pruneEntries() {
 	for i, e := range c.entries {
 		// If the time is outside our range, assume the entry has been lost report and remove it
 		if e.Before(time.Now().Add(-c.maxWait)) {
-			missingEntry.Inc()
+			missingEntries.Inc()
 			_, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e, c.maxWait.Seconds())
 		} else {
 			if i != k {
diff --git a/pkg/comparator/comparator_test.go b/pkg/comparator/comparator_test.go
index 37275e8a..6e8cc5fa 100644
--- a/pkg/comparator/comparator_test.go
+++ b/pkg/comparator/comparator_test.go
@@ -3,13 +3,20 @@ package comparator
 import (
 	"bytes"
 	"fmt"
+	"sync"
 	"testing"
 	"time"
 
+	"github.com/prometheus/client_golang/prometheus"
+	io_prometheus_client "github.com/prometheus/client_model/go"
 	"github.com/stretchr/testify/assert"
 )
 
 func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
+	outOfOrderEntries = &mockCounter{}
+	missingEntries = &mockCounter{}
+	unexpectedEntries = &mockCounter{}
+
 	actual := &bytes.Buffer{}
 	c := NewComparator(actual, 1*time.Hour, 1*time.Hour)
 
@@ -24,13 +31,26 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
 	c.EntrySent(t4)
 
 	c.EntryReceived(t1)
+	assert.Equal(t, 3, c.Size())
 	c.EntryReceived(t4)
-	expected := fmt.Sprintf(ErrOutOfOrderEntry, t4, []time.Time{t2, t3})
+	assert.Equal(t, 2, c.Size())
+	c.EntryReceived(t2)
+	c.EntryReceived(t3)
+	assert.Equal(t, 0, c.Size())
 
+	expected := fmt.Sprintf(ErrOutOfOrderEntry, t4, []time.Time{t2, t3})
 	assert.Equal(t, expected, actual.String())
+
+	assert.Equal(t, 1, outOfOrderEntries.(*mockCounter).count)
+	assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count)
+	assert.Equal(t, 0, missingEntries.(*mockCounter).count)
 }
 
 func TestComparatorEntryReceivedNotExpected(t *testing.T) {
+	outOfOrderEntries = &mockCounter{}
+	missingEntries = &mockCounter{}
+	unexpectedEntries = &mockCounter{}
+
 	actual := &bytes.Buffer{}
 	c := NewComparator(actual, 1*time.Hour, 1*time.Hour)
 
@@ -51,12 +71,19 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
 	assert.Equal(t, 1, c.Size())
 	c.EntryReceived(t4)
 	assert.Equal(t, 0, c.Size())
-	expected := ""
 
+	expected := ""
 	assert.Equal(t, expected, actual.String())
+
+	assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count)
+	assert.Equal(t, 1, unexpectedEntries.(*mockCounter).count)
+	assert.Equal(t, 0, missingEntries.(*mockCounter).count)
 }
 
 func TestEntryNeverReceived(t *testing.T) {
+	outOfOrderEntries = &mockCounter{}
+	missingEntries = &mockCounter{}
+	unexpectedEntries = &mockCounter{}
 
 	actual := &bytes.Buffer{}
 	c := NewComparator(actual, 5*time.Millisecond, 2*time.Millisecond)
@@ -86,4 +113,39 @@ func TestEntryNeverReceived(t *testing.T) {
 	assert.Equal(t, expected, actual.String())
 	assert.Equal(t, 0, c.Size())
 
+	assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count)
+	assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count)
+	assert.Equal(t, 1, missingEntries.(*mockCounter).count)
+
+}
+
+type mockCounter struct {
+	cLck  sync.Mutex
+	count int
+}
+
+func (m *mockCounter) Desc() *prometheus.Desc {
+	panic("implement me")
+}
+
+func (m *mockCounter) Write(*io_prometheus_client.Metric) error {
+	panic("implement me")
+}
+
+func (m *mockCounter) Describe(chan<- *prometheus.Desc) {
+	panic("implement me")
+}
+
+func (m *mockCounter) Collect(chan<- prometheus.Metric) {
+	panic("implement me")
+}
+
+func (m *mockCounter) Add(float64) {
+	panic("implement me")
+}
+
+func (m *mockCounter) Inc() {
+	m.cLck.Lock()
+	defer m.cLck.Unlock()
+	m.count++
 }
diff --git a/pkg/reader/logproto.pb.go b/pkg/reader/logproto.pb.go
index a555d6d9..39c6ac71 100644
--- a/pkg/reader/logproto.pb.go
+++ b/pkg/reader/logproto.pb.go
@@ -3,19 +3,20 @@
 package reader
 
 import (
-	context "context"
-	fmt "fmt"
+	"context"
+	"fmt"
+	"io"
+	"math"
+	"reflect"
+	"strconv"
+	"strings"
+	"time"
+
 	_ "github.com/gogo/protobuf/gogoproto"
-	proto "github.com/gogo/protobuf/proto"
+	"github.com/gogo/protobuf/proto"
 	_ "github.com/gogo/protobuf/types"
 	github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
-	grpc "google.golang.org/grpc"
-	io "io"
-	math "math"
-	reflect "reflect"
-	strconv "strconv"
-	strings "strings"
-	time "time"
+	"google.golang.org/grpc"
 )
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/production/ksonnet/loki-canary/jsonnetfile.json b/production/ksonnet/loki-canary/jsonnetfile.json
new file mode 100644
index 00000000..c903ac17
--- /dev/null
+++ b/production/ksonnet/loki-canary/jsonnetfile.json
@@ -0,0 +1,14 @@
+{
+    "dependencies": [
+        {
+            "name": "ksonnet-util",
+            "source": {
+                "git": {
+                    "remote": "https://github.com/grafana/jsonnet-libs",
+                    "subdir": "ksonnet-util"
+                }
+            },
+            "version": "master"
+        }
+    ]
+}
diff --git a/production/ksonnet/loki-canary/loki-canary.libsonnet b/production/ksonnet/loki-canary/loki-canary.libsonnet
new file mode 100644
index 00000000..195df2aa
--- /dev/null
+++ b/production/ksonnet/loki-canary/loki-canary.libsonnet
@@ -0,0 +1,46 @@
+local k = import 'ksonnet-util/kausal.libsonnet';
+
+k {
+  local container = $.core.v1.container,
+
+  loki_canary_args:: {},
+
+  _images+:: {
+    loki_canary: 'loki-canary:latest',
+  },
+
+  loki_canary_container::
+    container.new('loki-canary', $._images.loki_canary) +
+    container.withPorts($.core.v1.containerPort.new('http-metrics', 80)) +
+    container.withArgsMixin($.util.mapToFlags($.loki_canary_args)) +
+    container.withEnv([
+      container.envType.fromFieldPath('HOSTNAME', 'spec.nodeName'),
+    ]),
+
+  local daemonSet = $.extensions.v1beta1.daemonSet,
+
+  local downwardApiMount(name, path, volumeMountMixin={}) =
+        local container = $.core.v1.container,
+              deployment = $.extensions.v1beta1.deployment,
+              volumeMount = $.core.v1.volumeMount,
+              volume = $.core.v1.volume,
+              addMount(c) = c + container.withVolumeMountsMixin(
+          volumeMount.new(name, path) +
+          volumeMountMixin,
+        );
+
+        deployment.mapContainers(addMount) +
+        deployment.mixin.spec.template.spec.withVolumesMixin([
+          volume.withName(name) +
+          volume.mixin.downwardApi.withItems([
+            {
+              path: "name",
+              fieldRef: { fieldPath: "metadata.name" },
+            },
+          ]),
+        ]),
+
+  loki_canary_daemonset:
+    daemonSet.new('loki-canary', [$.loki_canary_container]) +
+    downwardApiMount('pod-name', '/etc/loki-canary'),
+}
\ No newline at end of file
-- 
GitLab