diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 40dab0d4701ea9990aa335e2835fa1c1f765b6e0..442fcc396a75701cfda0db0eaa44320859b0f124 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -1,11 +1,14 @@ package main import ( + "flag" "fmt" + "io/ioutil" "net/http" "net/url" "os" "os/signal" + "strconv" "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -17,22 +20,60 @@ import ( func main() { - c := comparator.NewComparator(os.Stderr, 1*time.Minute, 1*time.Second) + lName := flag.String("labelname", "name", "The label name for this instance of loki-canary to use in the log selector") + lVal := flag.String("labelvalue", "loki-canary", "The unique label value for this instance of loki-canary to use in the log selector") + usePodName := flag.Bool("usepod", false, "If true, loki-canary will read the pod name from /etc/loki-canary/pod_name as the unique label value") + port := flag.Int("port", 3500, "Port which loki-canary should expose metrics") + addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100") + tls := flag.Bool("tls", false, "Does the loki connection use TLS?") + user := flag.String("user", "", "Loki username") + pass := flag.String("pass", "", "Loki password") - w := writer.NewWriter(os.Stdout, c, 10*time.Millisecond, 1024) + interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") + size := flag.Int("size", 100, "Size in bytes of each log line") + wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost") + flag.Parse() + + val := *lVal + if *usePodName { + data, err := ioutil.ReadFile("/etc/loki-canary/name") + if err != nil { + panic(err) + } + val = string(data) + } + + if *addr == "" { + panic("Must specify a Loki address with -addr") + } + + var ui *url.Userinfo + if *user != "" { + ui = url.UserPassword(*user, *pass) + } + + scheme := "ws" + if *tls { + scheme = "wss" + } u := url.URL{ - Scheme: "ws", - Host: "loki:3100", + Scheme: scheme, + Host: *addr, + User: ui, Path: "/api/prom/tail", - RawQuery: "query=" + url.QueryEscape("{name=\"loki-canary\",stream=\"stdout\"}"), + RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", *lName, val)), } + _, _ = fmt.Fprintf(os.Stderr, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), *lName, val) + + c := comparator.NewComparator(os.Stderr, *wait, 1*time.Second) + w := writer.NewWriter(os.Stdout, c, *interval, *size) r := reader.NewReader(os.Stderr, c, u, "", "") http.Handle("/metrics", promhttp.Handler()) go func() { - err := http.ListenAndServe(":2112", nil) + err := http.ListenAndServe(":"+strconv.Itoa(*port), nil) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index 4703d635d1b0e0843af4099ed2fc0273f3490478..8e45d829f9a6f95e09ac098f6f9eaad2aacd1112 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang/protobuf v1.3.1 // indirect github.com/gorilla/websocket v1.4.0 github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 + github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f github.com/prometheus/common v0.3.0 // indirect github.com/stretchr/testify v1.3.0 golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect diff --git a/pkg/comparator/comparator.go b/pkg/comparator/comparator.go index 7038acaee586b218164218e3a8170f073bddd432..c349752af74069750c73cfa3caca98b3c42586d1 100644 --- a/pkg/comparator/comparator.go +++ b/pkg/comparator/comparator.go @@ -16,13 +16,31 @@ const ( ) var ( - outOfOrderEntry = promauto.NewCounter(prometheus.CounterOpts{ - Name: "out_of_order_entry", - Help: "The total number of processed events", + totalEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "total_entries", + Help: "counts log entries written to the file", }) - missingEntry = promauto.NewCounter(prometheus.CounterOpts{ - Name: "missing_entry", - Help: "The total number of processed events", + outOfOrderEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "out_of_order_entries", + Help: "counts log entries received with a timestamp more recent than the others in the queue", + }) + missingEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "missing_entries", + Help: "counts log entries not received within the maxWait duration and is reported as missing", + }) + unexpectedEntries = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "unexpected_entries", + Help: "counts a log entry received which was not expected (e.g. duplicate, received after reported missing)", + }) + responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki_canary", + Name: "response_latency", + Help: "is how long it takes for log lines to be returned from Loki in seconds.", + Buckets: []float64{0.5, 1, 2.5, 5, 10, 30, 60}, }) ) @@ -60,21 +78,26 @@ func (c *Comparator) EntrySent(time time.Time) { c.entMtx.Lock() defer c.entMtx.Unlock() c.entries = append(c.entries, &time) + totalEntries.Inc() } +// EntryReceived removes the received entry from the buffer if it exists, reports on out of order entries received func (c *Comparator) EntryReceived(ts time.Time) { c.entMtx.Lock() defer c.entMtx.Unlock() // Output index k := 0 + matched := false for i, e := range c.entries { if ts.Equal(*e) { + matched = true // If this isn't the first item in the list we received it out of order if i != 0 { - outOfOrderEntry.Inc() + outOfOrderEntries.Inc() _, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) } + responseLatency.Observe(time.Now().Sub(ts).Seconds()) // Do not increment output index, effectively causing this element to be dropped } else { // If the current index doesn't match the output index, update the array with the correct position @@ -84,6 +107,9 @@ func (c *Comparator) EntryReceived(ts time.Time) { k++ } } + if !matched { + unexpectedEntries.Inc() + } // Nil out the pointers to any trailing elements which were removed from the slice for i := k; i < len(c.entries); i++ { c.entries[i] = nil // or the zero value of T @@ -120,7 +146,7 @@ func (c *Comparator) pruneEntries() { for i, e := range c.entries { // If the time is outside our range, assume the entry has been lost report and remove it if e.Before(time.Now().Add(-c.maxWait)) { - missingEntry.Inc() + missingEntries.Inc() _, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e, c.maxWait.Seconds()) } else { if i != k { diff --git a/pkg/comparator/comparator_test.go b/pkg/comparator/comparator_test.go index 37275e8aba5083994889e809102acbd02fa4d800..6e8cc5faad50bfbb903fb1370688eff512b2daa9 100644 --- a/pkg/comparator/comparator_test.go +++ b/pkg/comparator/comparator_test.go @@ -3,13 +3,20 @@ package comparator import ( "bytes" "fmt" + "sync" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" ) func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { + outOfOrderEntries = &mockCounter{} + missingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + actual := &bytes.Buffer{} c := NewComparator(actual, 1*time.Hour, 1*time.Hour) @@ -24,13 +31,26 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { c.EntrySent(t4) c.EntryReceived(t1) + assert.Equal(t, 3, c.Size()) c.EntryReceived(t4) - expected := fmt.Sprintf(ErrOutOfOrderEntry, t4, []time.Time{t2, t3}) + assert.Equal(t, 2, c.Size()) + c.EntryReceived(t2) + c.EntryReceived(t3) + assert.Equal(t, 0, c.Size()) + expected := fmt.Sprintf(ErrOutOfOrderEntry, t4, []time.Time{t2, t3}) assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 1, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 0, missingEntries.(*mockCounter).count) } func TestComparatorEntryReceivedNotExpected(t *testing.T) { + outOfOrderEntries = &mockCounter{} + missingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} + actual := &bytes.Buffer{} c := NewComparator(actual, 1*time.Hour, 1*time.Hour) @@ -51,12 +71,19 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) { assert.Equal(t, 1, c.Size()) c.EntryReceived(t4) assert.Equal(t, 0, c.Size()) - expected := "" + expected := "" assert.Equal(t, expected, actual.String()) + + assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 1, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 0, missingEntries.(*mockCounter).count) } func TestEntryNeverReceived(t *testing.T) { + outOfOrderEntries = &mockCounter{} + missingEntries = &mockCounter{} + unexpectedEntries = &mockCounter{} actual := &bytes.Buffer{} c := NewComparator(actual, 5*time.Millisecond, 2*time.Millisecond) @@ -86,4 +113,39 @@ func TestEntryNeverReceived(t *testing.T) { assert.Equal(t, expected, actual.String()) assert.Equal(t, 0, c.Size()) + assert.Equal(t, 0, outOfOrderEntries.(*mockCounter).count) + assert.Equal(t, 0, unexpectedEntries.(*mockCounter).count) + assert.Equal(t, 1, missingEntries.(*mockCounter).count) + +} + +type mockCounter struct { + cLck sync.Mutex + count int +} + +func (m *mockCounter) Desc() *prometheus.Desc { + panic("implement me") +} + +func (m *mockCounter) Write(*io_prometheus_client.Metric) error { + panic("implement me") +} + +func (m *mockCounter) Describe(chan<- *prometheus.Desc) { + panic("implement me") +} + +func (m *mockCounter) Collect(chan<- prometheus.Metric) { + panic("implement me") +} + +func (m *mockCounter) Add(float64) { + panic("implement me") +} + +func (m *mockCounter) Inc() { + m.cLck.Lock() + defer m.cLck.Unlock() + m.count++ } diff --git a/pkg/reader/logproto.pb.go b/pkg/reader/logproto.pb.go index a555d6d9087a27c45fa8db1b0a37074f204fc3cf..39c6ac71f2b7951bf4ef3e962a78fe17cd60005a 100644 --- a/pkg/reader/logproto.pb.go +++ b/pkg/reader/logproto.pb.go @@ -3,19 +3,20 @@ package reader import ( - context "context" - fmt "fmt" + "context" + "fmt" + "io" + "math" + "reflect" + "strconv" + "strings" + "time" + _ "github.com/gogo/protobuf/gogoproto" - proto "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/proto" _ "github.com/gogo/protobuf/types" github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" - grpc "google.golang.org/grpc" - io "io" - math "math" - reflect "reflect" - strconv "strconv" - strings "strings" - time "time" + "google.golang.org/grpc" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/production/ksonnet/loki-canary/jsonnetfile.json b/production/ksonnet/loki-canary/jsonnetfile.json new file mode 100644 index 0000000000000000000000000000000000000000..c903ac17c07c99f3e161aa16e817c679706f4ddb --- /dev/null +++ b/production/ksonnet/loki-canary/jsonnetfile.json @@ -0,0 +1,14 @@ +{ + "dependencies": [ + { + "name": "ksonnet-util", + "source": { + "git": { + "remote": "https://github.com/grafana/jsonnet-libs", + "subdir": "ksonnet-util" + } + }, + "version": "master" + } + ] +} diff --git a/production/ksonnet/loki-canary/loki-canary.libsonnet b/production/ksonnet/loki-canary/loki-canary.libsonnet new file mode 100644 index 0000000000000000000000000000000000000000..195df2aaa6e26d3c5ddfa2f5507bdee15f6aa756 --- /dev/null +++ b/production/ksonnet/loki-canary/loki-canary.libsonnet @@ -0,0 +1,46 @@ +local k = import 'ksonnet-util/kausal.libsonnet'; + +k { + local container = $.core.v1.container, + + loki_canary_args:: {}, + + _images+:: { + loki_canary: 'loki-canary:latest', + }, + + loki_canary_container:: + container.new('loki-canary', $._images.loki_canary) + + container.withPorts($.core.v1.containerPort.new('http-metrics', 80)) + + container.withArgsMixin($.util.mapToFlags($.loki_canary_args)) + + container.withEnv([ + container.envType.fromFieldPath('HOSTNAME', 'spec.nodeName'), + ]), + + local daemonSet = $.extensions.v1beta1.daemonSet, + + local downwardApiMount(name, path, volumeMountMixin={}) = + local container = $.core.v1.container, + deployment = $.extensions.v1beta1.deployment, + volumeMount = $.core.v1.volumeMount, + volume = $.core.v1.volume, + addMount(c) = c + container.withVolumeMountsMixin( + volumeMount.new(name, path) + + volumeMountMixin, + ); + + deployment.mapContainers(addMount) + + deployment.mixin.spec.template.spec.withVolumesMixin([ + volume.withName(name) + + volume.mixin.downwardApi.withItems([ + { + path: "name", + fieldRef: { fieldPath: "metadata.name" }, + }, + ]), + ]), + + loki_canary_daemonset: + daemonSet.new('loki-canary', [$.loki_canary_container]) + + downwardApiMount('pod-name', '/etc/loki-canary'), +} \ No newline at end of file