From 40ae03b48a4bba0c79f804cbe5ac82bcc0679344 Mon Sep 17 00:00:00 2001
From: Tom Wilkie <tomwilkie@users.noreply.github.com>
Date: Mon, 11 Feb 2019 12:05:11 +0000
Subject: [PATCH] Include the stream's labels in OOO error responses. (#304)

* Include the stream's labels in OOO error responses.

Also, read the body of error responses and log them.  And retries 500s and connection errors.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Log close errors.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Update pkg/promtail/client/client.go

Co-Authored-By: tomwilkie <tomwilkie@users.noreply.github.com>

* Log retries at warn, final errors at error.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
---
 pkg/chunkenc/interface.go     |   7 +--
 pkg/ingester/stream.go        |   6 ++
 pkg/promtail/client/client.go | 107 ++++++++++++++++++++++++----------
 3 files changed, 85 insertions(+), 35 deletions(-)

diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go
index 50082f54..b0135876 100644
--- a/pkg/chunkenc/interface.go
+++ b/pkg/chunkenc/interface.go
@@ -3,19 +3,16 @@ package chunkenc
 import (
 	"errors"
 	"io"
-	"net/http"
 	"time"
 
-	"github.com/weaveworks/common/httpgrpc"
-
 	"github.com/grafana/loki/pkg/iter"
 	"github.com/grafana/loki/pkg/logproto"
 )
 
 // Errors returned by the chunk interface.
 var (
-	ErrChunkFull       = errors.New("Chunk full")
-	ErrOutOfOrder      = httpgrpc.Errorf(http.StatusBadRequest, "Entry out of order")
+	ErrChunkFull       = errors.New("chunk full")
+	ErrOutOfOrder      = errors.New("entry out of order")
 	ErrInvalidSize     = errors.New("invalid size")
 	ErrInvalidFlag     = errors.New("invalid flag")
 	ErrInvalidChecksum = errors.New("invalid checksum")
diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go
index 35c6d539..f2a6529a 100644
--- a/pkg/ingester/stream.go
+++ b/pkg/ingester/stream.go
@@ -2,11 +2,13 @@ package ingester
 
 import (
 	"context"
+	"net/http"
 	"time"
 
 	"github.com/cortexproject/cortex/pkg/ingester/client"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
+	"github.com/weaveworks/common/httpgrpc"
 
 	"github.com/grafana/loki/pkg/chunkenc"
 	"github.com/grafana/loki/pkg/iter"
@@ -93,6 +95,10 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
 		chunk.lastUpdated = entries[i].Timestamp
 	}
 
+	if appendErr == chunkenc.ErrOutOfOrder {
+		return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelPairsToLabels(s.labels).String())
+	}
+
 	return appendErr
 }
 
diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go
index 7f447524..7ad3bb7d 100644
--- a/pkg/promtail/client/client.go
+++ b/pkg/promtail/client/client.go
@@ -1,14 +1,19 @@
 package client
 
 import (
+	"bufio"
 	"bytes"
+	"context"
 	"flag"
 	"fmt"
+	"io"
 	"net/http"
 	"strconv"
 	"sync"
 	"time"
 
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/cortexproject/cortex/pkg/util/flagext"
 	"github.com/go-kit/kit/log"
 	"github.com/go-kit/kit/log/level"
 	"github.com/gogo/protobuf/proto"
@@ -16,12 +21,12 @@ import (
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/common/model"
 
-	"github.com/cortexproject/cortex/pkg/util/flagext"
-
+	"github.com/grafana/loki/pkg/helpers"
 	"github.com/grafana/loki/pkg/logproto"
 )
 
 const contentType = "application/x-protobuf"
+const maxErrMsgLen = 1024
 
 var (
 	sentBytes = prometheus.NewCounter(prometheus.CounterOpts{
@@ -47,7 +52,8 @@ type Config struct {
 	BatchWait time.Duration
 	BatchSize int
 
-	ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
+	BackoffConfig  util.BackoffConfig `yaml:"backoff_config"`
+	ExternalLabels model.LabelSet     `yaml:"external_labels,omitempty"`
 }
 
 // RegisterFlags registers flags.
@@ -55,6 +61,10 @@ func (c *Config) RegisterFlags(flags *flag.FlagSet) {
 	flags.Var(&c.URL, "client.url", "URL of log server")
 	flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
 	flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
+
+	flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
+	flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
+	flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
 }
 
 // Client for pushing logs in snappy-compressed protos over HTTP.
@@ -76,10 +86,11 @@ type entry struct {
 // New makes a new Client.
 func New(cfg Config, logger log.Logger) (*Client, error) {
 	c := &Client{
-		logger:         logger,
-		cfg:            cfg,
-		quit:           make(chan struct{}),
-		entries:        make(chan entry),
+		logger:  logger,
+		cfg:     cfg,
+		quit:    make(chan struct{}),
+		entries: make(chan entry),
+
 		externalLabels: cfg.ExternalLabels,
 	}
 	c.wg.Add(1)
@@ -93,9 +104,7 @@ func (c *Client) run() {
 	maxWait := time.NewTimer(c.cfg.BatchWait)
 
 	defer func() {
-		if err := c.send(batch); err != nil {
-			level.Error(c.logger).Log("msg", "error sending batch", "error", err)
-		}
+		c.sendBatch(batch)
 		c.wg.Done()
 	}()
 
@@ -104,11 +113,10 @@ func (c *Client) run() {
 		select {
 		case <-c.quit:
 			return
+
 		case e := <-c.entries:
 			if batchSize+len(e.Line) > c.cfg.BatchSize {
-				if err := c.send(batch); err != nil {
-					level.Error(c.logger).Log("msg", "error sending batch", "error", err)
-				}
+				c.sendBatch(batch)
 				batchSize = 0
 				batch = map[model.Fingerprint]*logproto.Stream{}
 			}
@@ -123,11 +131,10 @@ func (c *Client) run() {
 				batch[fp] = stream
 			}
 			stream.Entries = append(stream.Entries, e.Entry)
+
 		case <-maxWait.C:
 			if len(batch) > 0 {
-				if err := c.send(batch); err != nil {
-					level.Error(c.logger).Log("msg", "error sending batch", "error", err)
-				}
+				c.sendBatch(batch)
 				batchSize = 0
 				batch = map[model.Fingerprint]*logproto.Stream{}
 			}
@@ -135,37 +142,77 @@ func (c *Client) run() {
 	}
 }
 
-func (c *Client) send(batch map[model.Fingerprint]*logproto.Stream) error {
+func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
+	buf, err := encodeBatch(batch)
+	if err != nil {
+		level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
+		return
+	}
+	sentBytes.Add(float64(len(buf)))
+
+	ctx := context.Background()
+	backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
+	var status int
+	for backoff.Ongoing() {
+		start := time.Now()
+		status, err = c.send(ctx, buf)
+		requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())
+		if err == nil {
+			return
+		}
+
+		// Only retry 500s and connection-level errors.
+		if status > 0 && status/100 != 5 {
+			break
+		}
+
+		level.Warn(c.logger).Log("msg", "error sending batch", "status", status, "error", err)
+		backoff.Wait()
+	}
+
+	if err != nil {
+		level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
+	}
+}
+
+func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
 	req := logproto.PushRequest{
 		Streams: make([]*logproto.Stream, 0, len(batch)),
 	}
-	count := 0
 	for _, stream := range batch {
 		req.Streams = append(req.Streams, stream)
-		count += len(stream.Entries)
 	}
 	buf, err := proto.Marshal(&req)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	buf = snappy.Encode(nil, buf)
-	sentBytes.Add(float64(len(buf)))
+	return buf, nil
+}
 
-	start := time.Now()
-	resp, err := http.Post(c.cfg.URL.String(), contentType, bytes.NewReader(buf))
+func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
+	req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
 	if err != nil {
-		requestDuration.WithLabelValues("failed").Observe(time.Since(start).Seconds())
-		return err
+		return -1, err
 	}
-	if err := resp.Body.Close(); err != nil {
-		return err
+	req = req.WithContext(ctx)
+	req.Header.Set("Content-Type", contentType)
+
+	resp, err := http.DefaultClient.Do(req)
+	if err != nil {
+		return -1, err
 	}
-	requestDuration.WithLabelValues(strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds())
+	defer helpers.LogError("closing response body", resp.Body.Close)
 
 	if resp.StatusCode/100 != 2 {
-		return fmt.Errorf("Error doing write: %d - %s", resp.StatusCode, resp.Status)
+		scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
+		line := ""
+		if scanner.Scan() {
+			line = scanner.Text()
+		}
+		err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
 	}
-	return nil
+	return resp.StatusCode, err
 }
 
 // Stop the client.
-- 
GitLab