Skip to content
Snippets Groups Projects
Commit 62d5122a authored by Marco Pracucci's avatar Marco Pracucci Committed by Ed Welch
Browse files

Promtail exports metrics on sent and dropped log entries

parent 2064f91b
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,11 @@ Promtail metrics:
- `promtail_read_bytes_total` Number of bytes read.
- `promtail_read_lines_total` Number of lines read.
- `promtail_request_duration_seconds_count` Number of send requests.
- `promtail_encoded_bytes_total` Number of bytes encoded and ready to send.
- `promtail_sent_bytes_total` Number of bytes sent.
- `promtail_dropped_bytes_total` Number of bytes dropped because failed to be sent to the ingester after all retries.
- `promtail_sent_entries_total` Number of log entries sent to the ingester.
- `promtail_dropped_entries_total` Number of log entries dropped because failed to be sent to the ingester after all retries.
Most of these metrics are counters and should continuously increase during normal operations:
......
......@@ -40,6 +40,21 @@ var (
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{"host"})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{"host"})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
......@@ -50,6 +65,9 @@ var (
func init() {
prometheus.MustRegister(encodedBytes)
prometheus.MustRegister(sentBytes)
prometheus.MustRegister(droppedBytes)
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
}
......@@ -153,7 +171,7 @@ func (c *client) run() {
}
func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
buf, err := encodeBatch(batch)
buf, entriesCount, err := encodeBatch(batch)
if err != nil {
level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
return
......@@ -171,6 +189,7 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
return
}
......@@ -185,22 +204,28 @@ func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}
func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, int, error) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(batch)),
}
entriesCount := 0
for _, stream := range batch {
req.Streams = append(req.Streams, stream)
entriesCount += len(stream.Entries)
}
buf, err := proto.Marshal(&req)
if err != nil {
return nil, err
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, nil
return buf, entriesCount, nil
}
func (c *client) send(ctx context.Context, buf []byte) (int, error) {
......
......@@ -3,6 +3,7 @@ package client
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
......@@ -14,17 +15,21 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
func TestClient_Handle(t *testing.T) {
logEntries := []entry{
var (
logEntries = []entry{
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}},
{labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}},
}
)
func TestClient_Handle(t *testing.T) {
tests := map[string]struct {
clientBatchSize int
clientBatchWait time.Duration
......@@ -33,6 +38,7 @@ func TestClient_Handle(t *testing.T) {
inputEntries []entry
inputDelay time.Duration
expectedBatches [][]*logproto.Stream
expectedMetrics string
}{
"batch log entries together until the batch size is reached": {
clientBatchSize: 10,
......@@ -48,6 +54,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 3.0
`,
},
"batch log entries together until the batch wait time is reached": {
clientBatchSize: 10,
......@@ -64,6 +75,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[1].Entry}},
},
},
expectedMetrics: `
# HELP promtail_sent_entries_total Number of log entries sent to the ingester.
# TYPE promtail_sent_entries_total counter
promtail_sent_entries_total{host="__HOST__"} 2.0
`,
},
"retry send a batch up to backoff's max retries in case the server responds with a 5xx": {
clientBatchSize: 10,
......@@ -82,6 +98,11 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
"do not retry send a batch in case the server responds with a 4xx": {
clientBatchSize: 10,
......@@ -94,11 +115,20 @@ func TestClient_Handle(t *testing.T) {
{Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
},
expectedMetrics: `
# HELP promtail_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE promtail_dropped_entries_total counter
promtail_dropped_entries_total{host="__HOST__"} 1.0
`,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Reset metrics
sentEntries.Reset()
droppedEntries.Reset()
// Create a buffer channel where we do enqueue received requests
receivedReqsChan := make(chan logproto.PushRequest, 10)
......@@ -156,6 +186,55 @@ func TestClient_Handle(t *testing.T) {
for i, batch := range receivedReqs {
assert.Equal(t, testData.expectedBatches[i], batch.Streams)
}
expectedMetrics := strings.Replace(testData.expectedMetrics, "__HOST__", serverURL.Host, -1)
err = testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(expectedMetrics), "promtail_sent_entries_total", "promtail_dropped_entries_total")
assert.NoError(t, err)
})
}
}
func TestClient_encodeBatch(t *testing.T) {
t.Parallel()
tests := map[string]struct {
inputBatch map[model.Fingerprint]*logproto.Stream
expectedEntriesCount int
}{
"empty batch": {
inputBatch: map[model.Fingerprint]*logproto.Stream{},
expectedEntriesCount: 0,
},
"single stream with single log entry": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry}},
},
expectedEntriesCount: 1,
},
"single stream with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
},
expectedEntriesCount: 2,
},
"multiple streams with multiple log entries": {
inputBatch: map[model.Fingerprint]*logproto.Stream{
model.Fingerprint(1): {Labels: "{}", Entries: []logproto.Entry{logEntries[0].Entry, logEntries[1].Entry}},
model.Fingerprint(2): {Labels: "{}", Entries: []logproto.Entry{logEntries[2].Entry}},
},
expectedEntriesCount: 3,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
t.Parallel()
_, entriesCount, err := encodeBatch(testData.inputBatch)
require.NoError(t, err)
assert.Equal(t, testData.expectedEntriesCount, entriesCount)
})
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment