From 53075db577c72a5649fdb50020590382812bf0f9 Mon Sep 17 00:00:00 2001 From: Cyril Tovena <cyril.tovena@gmail.com> Date: Tue, 7 May 2019 11:52:58 -0400 Subject: [PATCH] Send logs to multiple loki instances (#536) * Adds the ability to provide multiple Loki URL For backward compatibility `client:` still works with flag. * add some tests for multi client * update ksonnet module to support multiple client * fix comment * fix lint issues --- cmd/promtail/promtail-docker-config.yaml | 4 +- cmd/promtail/promtail-local-config.yaml | 4 +- pkg/promtail/client/client.go | 67 +++++------- pkg/promtail/client/config.go | 57 ++++++++++ pkg/promtail/client/fake/client.go | 24 ++++ pkg/promtail/client/multi.go | 43 ++++++++ pkg/promtail/client/multi_test.go | 103 ++++++++++++++++++ pkg/promtail/config/config.go | 4 +- pkg/promtail/promtail.go | 9 +- pkg/util/errors.go | 48 ++++++++ production/ksonnet/README.md | 16 ++- production/ksonnet/promtail/config.libsonnet | 18 ++- .../ksonnet/promtail/promtail.libsonnet | 13 ++- 13 files changed, 342 insertions(+), 68 deletions(-) create mode 100644 pkg/promtail/client/config.go create mode 100644 pkg/promtail/client/fake/client.go create mode 100644 pkg/promtail/client/multi.go create mode 100644 pkg/promtail/client/multi_test.go create mode 100644 pkg/util/errors.go diff --git a/cmd/promtail/promtail-docker-config.yaml b/cmd/promtail/promtail-docker-config.yaml index 8dc8f273..b14c2300 100644 --- a/cmd/promtail/promtail-docker-config.yaml +++ b/cmd/promtail/promtail-docker-config.yaml @@ -5,8 +5,8 @@ server: positions: filename: /tmp/positions.yaml -client: - url: http://loki:3100/api/prom/push +clients: + - url: http://loki:3100/api/prom/push scrape_configs: - job_name: system diff --git a/cmd/promtail/promtail-local-config.yaml b/cmd/promtail/promtail-local-config.yaml index d5b13c5e..d2f34c5f 100644 --- a/cmd/promtail/promtail-local-config.yaml +++ b/cmd/promtail/promtail-local-config.yaml @@ -5,8 +5,8 @@ server: positions: filename: /tmp/positions.yaml -client: - url: http://localhost:3100/api/prom/push +clients: + - url: http://localhost:3100/api/prom/push scrape_configs: - job_name: system diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 12743180..8296bced 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "flag" "fmt" "io" "net/http" @@ -12,8 +11,9 @@ import ( "sync" "time" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" @@ -29,21 +29,21 @@ const contentType = "application/x-protobuf" const maxErrMsgLen = 1024 var ( - encodedBytes = prometheus.NewCounter(prometheus.CounterOpts{ + encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", - }) - sentBytes = prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{"host"}) + sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_bytes_total", Help: "Number of bytes sent.", - }) + }, []string{"host"}) requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", Help: "Duration of send requests.", - }, []string{"status_code"}) + }, []string{"status_code", "host"}) ) func init() { @@ -52,32 +52,15 @@ func init() { prometheus.MustRegister(requestDuration) } -// Config describes configuration for a HTTP pusher client. -type Config struct { - URL flagext.URLValue - BatchWait time.Duration - BatchSize int - - BackoffConfig util.BackoffConfig `yaml:"backoff_config"` - // The labels to add to any time series or alerts when communicating with loki - ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"` - Timeout time.Duration `yaml:"timeout"` -} - -// RegisterFlags registers flags. -func (c *Config) RegisterFlags(flags *flag.FlagSet) { - flags.Var(&c.URL, "client.url", "URL of log server") - flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.") - flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ") - - flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.") - flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.") - flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.") - flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request") +// Client pushes entries to Loki and can be stopped +type Client interface { + api.EntryHandler + // Stop goroutine sending batch of entries. + Stop() } // Client for pushing logs in snappy-compressed protos over HTTP. -type Client struct { +type client struct { logger log.Logger cfg Config quit chan struct{} @@ -93,9 +76,9 @@ type entry struct { } // New makes a new Client. -func New(cfg Config, logger log.Logger) (*Client, error) { - c := &Client{ - logger: logger, +func New(cfg Config, logger log.Logger) Client { + c := &client{ + logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, quit: make(chan struct{}), entries: make(chan entry), @@ -104,10 +87,10 @@ func New(cfg Config, logger log.Logger) (*Client, error) { } c.wg.Add(1) go c.run() - return c, nil + return c } -func (c *Client) run() { +func (c *client) run() { batch := map[model.Fingerprint]*logproto.Stream{} batchSize := 0 maxWait := time.NewTimer(c.cfg.BatchWait) @@ -151,14 +134,14 @@ func (c *Client) run() { } } -func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { +func (c *client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { buf, err := encodeBatch(batch) if err != nil { level.Error(c.logger).Log("msg", "error encoding batch", "error", err) return } bufBytes := float64(len(buf)) - encodedBytes.Add(bufBytes) + encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) ctx := context.Background() backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig) @@ -166,10 +149,10 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { for backoff.Ongoing() { start := time.Now() status, err = c.send(ctx, buf) - requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds()) + requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) if err == nil { - sentBytes.Add(bufBytes) + sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) return } @@ -202,7 +185,7 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) { return buf, nil } -func (c *Client) send(ctx context.Context, buf []byte) (int, error) { +func (c *client) send(ctx context.Context, buf []byte) (int, error) { ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) defer cancel() req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) @@ -230,13 +213,13 @@ func (c *Client) send(ctx context.Context, buf []byte) (int, error) { } // Stop the client. -func (c *Client) Stop() { +func (c *client) Stop() { close(c.quit) c.wg.Wait() } // Handle implement EntryHandler; adds a new line to the next batch; send is async. -func (c *Client) Handle(ls model.LabelSet, t time.Time, s string) error { +func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { if len(c.externalLabels) > 0 { ls = c.externalLabels.Merge(ls) } diff --git a/pkg/promtail/client/config.go b/pkg/promtail/client/config.go new file mode 100644 index 00000000..bcc3d04b --- /dev/null +++ b/pkg/promtail/client/config.go @@ -0,0 +1,57 @@ +package client + +import ( + "flag" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/prometheus/common/model" +) + +// Config describes configuration for a HTTP pusher client. +type Config struct { + URL flagext.URLValue + BatchWait time.Duration + BatchSize int + + BackoffConfig util.BackoffConfig `yaml:"backoff_config"` + // The labels to add to any time series or alerts when communicating with loki + ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"` + Timeout time.Duration `yaml:"timeout"` +} + +// RegisterFlags registers flags. +func (c *Config) RegisterFlags(flags *flag.FlagSet) { + flags.Var(&c.URL, "client.url", "URL of log server") + flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.") + flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ") + + flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.") + flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.") + flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.") + flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request") + +} + +// UnmarshalYAML implement Yaml Unmarshaler +func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + type raw Config + // force sane defaults. + cfg := raw{ + BackoffConfig: util.BackoffConfig{ + MaxBackoff: 5 * time.Second, + MaxRetries: 5, + MinBackoff: 100 * time.Millisecond, + }, + BatchSize: 100 * 1024, + BatchWait: 1 * time.Second, + Timeout: 10 * time.Second, + } + if err := unmarshal(&cfg); err != nil { + return err + } + + *c = Config(cfg) + return nil +} diff --git a/pkg/promtail/client/fake/client.go b/pkg/promtail/client/fake/client.go new file mode 100644 index 00000000..a195690d --- /dev/null +++ b/pkg/promtail/client/fake/client.go @@ -0,0 +1,24 @@ +package fake + +import ( + "time" + + "github.com/grafana/loki/pkg/promtail/api" + "github.com/prometheus/common/model" +) + +// Client is a fake client used for testing. +type Client struct { + OnHandleEntry api.EntryHandlerFunc + OnStop func() +} + +// Stop implements client.Client +func (c *Client) Stop() { + c.OnStop() +} + +// Handle implements client.Client +func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error { + return c.OnHandleEntry.Handle(labels, time, entry) +} diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go new file mode 100644 index 00000000..d634e302 --- /dev/null +++ b/pkg/promtail/client/multi.go @@ -0,0 +1,43 @@ +package client + +import ( + "errors" + "time" + + "github.com/go-kit/kit/log" + "github.com/grafana/loki/pkg/util" + "github.com/prometheus/common/model" +) + +// MultiClient is client pushing to one or more loki instances. +type MultiClient []Client + +// NewMulti creates a new client +func NewMulti(logger log.Logger, cfgs ...Config) (Client, error) { + if len(cfgs) == 0 { + return nil, errors.New("at least one client config should be provided") + } + var clients []Client + for _, cfg := range cfgs { + clients = append(clients, New(cfg, logger)) + } + return MultiClient(clients), nil +} + +// Handle Implements api.EntryHandler +func (m MultiClient) Handle(labels model.LabelSet, time time.Time, entry string) error { + var result util.MultiError + for _, client := range m { + if err := client.Handle(labels, time, entry); err != nil { + result.Add(err) + } + } + return result.Err() +} + +// Stop implements Client +func (m MultiClient) Stop() { + for _, c := range m { + c.Stop() + } +} diff --git a/pkg/promtail/client/multi_test.go b/pkg/promtail/client/multi_test.go new file mode 100644 index 00000000..d31821c2 --- /dev/null +++ b/pkg/promtail/client/multi_test.go @@ -0,0 +1,103 @@ +package client + +import ( + "errors" + "net/url" + "reflect" + "testing" + "time" + + "github.com/grafana/loki/pkg/promtail/api" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/grafana/loki/pkg/promtail/client/fake" + "github.com/prometheus/common/model" +) + +func TestNewMulti(t *testing.T) { + _, err := NewMulti(util.Logger, []Config{}...) + if err == nil { + t.Fatal("expected err but got nil") + } + host1, _ := url.Parse("http://localhost:3100") + host2, _ := url.Parse("https://grafana.com") + expectedCfg1 := Config{BatchSize: 20, URL: flagext.URLValue{URL: host1}} + expectedCfg2 := Config{BatchSize: 10, URL: flagext.URLValue{URL: host2}} + + clients, err := NewMulti(util.Logger, expectedCfg1, expectedCfg2) + if err != nil { + t.Fatalf("expected err: nil got:%v", err) + } + multi := clients.(MultiClient) + if len(multi) != 2 { + t.Fatalf("expected client: 2 got:%d", len(multi)) + } + cfg1 := clients.(MultiClient)[0].(*client).cfg + + if !reflect.DeepEqual(cfg1, expectedCfg1) { + t.Fatalf("expected cfg: %v got:%v", expectedCfg1, cfg1) + } + + cfg2 := clients.(MultiClient)[1].(*client).cfg + + if !reflect.DeepEqual(cfg2, expectedCfg2) { + t.Fatalf("expected cfg: %v got:%v", expectedCfg2, cfg2) + } +} + +func TestMultiClient_Stop(t *testing.T) { + var stopped int + + stopping := func() { + stopped++ + } + fc := &fake.Client{OnStop: stopping} + clients := []Client{fc, fc, fc, fc} + m := MultiClient(clients) + + m.Stop() + + if stopped != len(clients) { + t.Fatal("missing stop call") + } +} + +func TestMultiClient_Handle(t *testing.T) { + + var called int + + errorFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return errors.New("") }) + okFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return nil }) + + errfc := &fake.Client{OnHandleEntry: errorFn} + okfc := &fake.Client{OnHandleEntry: okFn} + t.Run("some error", func(t *testing.T) { + clients := []Client{okfc, errfc, okfc, errfc, errfc, okfc} + m := MultiClient(clients) + + if err := m.Handle(nil, time.Now(), ""); err == nil { + t.Fatal("expected err got nil") + } + + if called != len(clients) { + t.Fatal("missing handle call") + } + + }) + t.Run("no error", func(t *testing.T) { + called = 0 + clients := []Client{okfc, okfc, okfc, okfc, okfc, okfc} + m := MultiClient(clients) + + if err := m.Handle(nil, time.Now(), ""); err != nil { + t.Fatal("expected err to be nil") + } + + if called != len(clients) { + t.Fatal("missing handle call") + } + + }) + +} diff --git a/pkg/promtail/config/config.go b/pkg/promtail/config/config.go index b96cb9d1..43cff9fe 100644 --- a/pkg/promtail/config/config.go +++ b/pkg/promtail/config/config.go @@ -12,8 +12,10 @@ import ( // Config for promtail, describing what files to watch. type Config struct { - ServerConfig server.Config `yaml:"server,omitempty"` + ServerConfig server.Config `yaml:"server,omitempty"` + // deprecated use ClientConfigs instead ClientConfig client.Config `yaml:"client,omitempty"` + ClientConfigs []client.Config `yaml:"clients,omitempty"` PositionsConfig positions.Config `yaml:"positions,omitempty"` ScrapeConfig []scrape.Config `yaml:"scrape_configs,omitempty"` TargetConfig targets.Config `yaml:"target_config,omitempty"` diff --git a/pkg/promtail/promtail.go b/pkg/promtail/promtail.go index 1a58453a..3fae7132 100644 --- a/pkg/promtail/promtail.go +++ b/pkg/promtail/promtail.go @@ -12,7 +12,7 @@ import ( // Promtail is the root struct for Promtail... type Promtail struct { - client *client.Client + client client.Client positions *positions.Positions targetManagers *targets.TargetManagers server *server.Server @@ -25,7 +25,12 @@ func New(cfg config.Config) (*Promtail, error) { return nil, err } - client, err := client.New(cfg.ClientConfig, util.Logger) + if cfg.ClientConfig.URL.URL != nil { + // if a single client config is used we add it to the multiple client config for backward compatibility + cfg.ClientConfigs = append(cfg.ClientConfigs, cfg.ClientConfig) + } + + client, err := client.NewMulti(util.Logger, cfg.ClientConfigs...) if err != nil { return nil, err } diff --git a/pkg/util/errors.go b/pkg/util/errors.go new file mode 100644 index 00000000..377b40a3 --- /dev/null +++ b/pkg/util/errors.go @@ -0,0 +1,48 @@ +package util + +import ( + "bytes" + "fmt" +) + +// The MultiError type implements the error interface, and contains the +// Errors used to construct it. +type MultiError []error + +// Returns a concatenated string of the contained errors +func (es MultiError) Error() string { + var buf bytes.Buffer + + if len(es) > 1 { + _, _ = fmt.Fprintf(&buf, "%d errors: ", len(es)) + } + + for i, err := range es { + if i != 0 { + buf.WriteString("; ") + } + buf.WriteString(err.Error()) + } + + return buf.String() +} + +// Add adds the error to the error list if it is not nil. +func (es *MultiError) Add(err error) { + if err == nil { + return + } + if merr, ok := err.(MultiError); ok { + *es = append(*es, merr...) + } else { + *es = append(*es, err) + } +} + +// Err returns the error list as an error or nil if it is empty. +func (es MultiError) Err() error { + if len(es) == 0 { + return nil + } + return es +} diff --git a/production/ksonnet/README.md b/production/ksonnet/README.md index a2f74ec6..66ef0ba0 100644 --- a/production/ksonnet/README.md +++ b/production/ksonnet/README.md @@ -40,15 +40,21 @@ promtail + { _config+:: { namespace: 'loki', - promtail_config: { - scheme: 'https', - hostname: 'logs-us-west1.grafana.net', - username: 'user-id', - password: 'password', + promtail_config+: { + clients: [ + { + scheme:: 'https', + hostname:: 'logs-us-west1.grafana.net', + username:: 'user-id', + password:: 'password', + external_labels: {}, + } + ], container_root_path: '/var/lib/docker', }, }, } + ``` Notice that `container_root_path` is your own data root for docker daemon, use `docker info | grep "Root Dir"` to get it. diff --git a/production/ksonnet/promtail/config.libsonnet b/production/ksonnet/promtail/config.libsonnet index 8b9f9a1f..01ace73f 100644 --- a/production/ksonnet/promtail/config.libsonnet +++ b/production/ksonnet/promtail/config.libsonnet @@ -6,19 +6,15 @@ _config+:: { prometheus_insecure_skip_verify: false, promtail_config: { - username: '', - password: '', - scheme: 'https', - hostname: 'logs-us-west1.grafana.net', + clients:[{ + username:: '', + password:: '', + scheme:: 'https', + hostname:: 'logs-us-west1.grafana.net', + external_labels: {}, + }], container_root_path: '/var/lib/docker', - external_labels: {}, entry_parser: 'docker', }, - - service_url: - if std.objectHas(self.promtail_config, 'username') then - '%(scheme)s://%(username)s:%(password)s@%(hostname)s/api/prom/push' % self.promtail_config - else - '%(scheme)s://%(hostname)s/api/prom/push' % self.promtail_config, }, } diff --git a/production/ksonnet/promtail/promtail.libsonnet b/production/ksonnet/promtail/promtail.libsonnet index 6b796465..bdd3b7d1 100644 --- a/production/ksonnet/promtail/promtail.libsonnet +++ b/production/ksonnet/promtail/promtail.libsonnet @@ -17,9 +17,17 @@ k + config + scrape_config { ]), promtail_config+:: { - client: { - external_labels: $._config.promtail_config.external_labels, + local service_url(client) = + if std.objectHasAll(client, 'username') then + '%(scheme)s://%(username)s:%(password)s@%(hostname)s/api/prom/push' % client + else + '%(scheme)s://%(hostname)s/api/prom/push' % client, + + local client_config(client) = client + { + url: service_url(client), }, + + clients: std.map(client_config,$._config.promtail_config.clients) }, local configMap = $.core.v1.configMap, @@ -31,7 +39,6 @@ k + config + scrape_config { }), promtail_args:: { - 'client.url': $._config.service_url, 'config.file': '/etc/promtail/promtail.yml', }, -- GitLab