From ab4c5bebab7f3e8a371e3caa9e1b8b4e55369d21 Mon Sep 17 00:00:00 2001 From: Tom Wilkie <tomwilkie@users.noreply.github.com> Date: Thu, 31 Jan 2019 10:01:59 +0000 Subject: [PATCH] Validate labels in the distributor. (#251) Also, make ErrOutOfOrder a HTTP 4xx error and continue to append even when we encourter an out of order entry. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> --- pkg/chunkenc/interface.go | 5 ++++- pkg/distributor/distributor.go | 27 +++++++++++++++++++++++---- pkg/ingester/instance.go | 29 ++++++++--------------------- pkg/ingester/stream.go | 7 +++++-- pkg/loki/loki.go | 1 + pkg/loki/modules.go | 28 +++++++++++++++++++--------- pkg/util/conv.go | 24 ++++++++++++++++++++++++ 7 files changed, 84 insertions(+), 37 deletions(-) create mode 100644 pkg/util/conv.go diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 8ec63fc9..50082f54 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -3,8 +3,11 @@ package chunkenc import ( "errors" "io" + "net/http" "time" + "github.com/weaveworks/common/httpgrpc" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" ) @@ -12,7 +15,7 @@ import ( // Errors returned by the chunk interface. var ( ErrChunkFull = errors.New("Chunk full") - ErrOutOfOrder = errors.New("Entry out of order") + ErrOutOfOrder = httpgrpc.Errorf(http.StatusBadRequest, "Entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid checksum") diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e85409fe..bc140c31 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -8,7 +8,8 @@ import ( cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util" ) var ( @@ -56,11 +58,12 @@ type Distributor struct { cfg Config clientCfg client.Config ring ring.ReadRing + overrides *validation.Overrides pool *cortex_client.Pool } // New a distributor creates. -func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, error) { +func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) { factory := func(addr string) (grpc_health_v1.HealthClient, error) { return client.New(clientCfg, addr) } @@ -69,7 +72,8 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, cfg: cfg, clientCfg: clientCfg, ring: ring, - pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger), + overrides: overrides, + pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger), }, nil } @@ -114,7 +118,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // We also work out the hash value at the same time. streams := make([]streamTracker, len(req.Streams)) keys := make([]uint32, 0, len(req.Streams)) + var validationErr error for i, stream := range req.Streams { + if err := d.validateLabels(userID, stream.Labels); err != nil { + validationErr = err + continue + } + keys = append(keys, tokenFor(userID, stream.Labels)) streams[i].stream = stream } @@ -160,10 +170,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log case err := <-tracker.err: return nil, err case <-tracker.done: - return &logproto.PushResponse{}, nil + return &logproto.PushResponse{}, validationErr } } +func (d *Distributor) validateLabels(userID, labels string) error { + ls, err := util.ToClientLabels(labels) + if err != nil { + return err + } + + return d.overrides.ValidateLabels(userID, ls) +} + // TODO taken from Cortex, see if we can refactor out an usable interface. func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { err := d.sendSamplesErr(ctx, ingester, streamTrackers) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index d5e405af..502fee4c 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -12,13 +12,13 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/parser" "github.com/grafana/loki/pkg/querier" + "github.com/grafana/loki/pkg/util" ) const queryBatchSize = 128 @@ -67,10 +67,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { i.streamsMtx.Lock() defer i.streamsMtx.Unlock() + var appendErr error for _, s := range req.Streams { - labels, err := toClientLabels(s.Labels) + labels, err := util.ToClientLabels(s.Labels) if err != nil { - return err + appendErr = err + continue } fp := client.FastFingerprint(labels) @@ -83,11 +85,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { } if err := stream.Push(ctx, s.Entries); err != nil { - return err + appendErr = err + continue } } - return nil + return appendErr } func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { @@ -183,19 +186,3 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, } return nil } - -func toClientLabels(labels string) ([]client.LabelPair, error) { - ls, err := parser.Labels(labels) - if err != nil { - return nil, err - } - - pairs := make([]client.LabelPair, 0, len(ls)) - for i := 0; i < len(ls); i++ { - pairs = append(pairs, client.LabelPair{ - Name: wire.Bytes(ls[i].Name), - Value: wire.Bytes(ls[i].Value), - }) - } - return pairs, nil -} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index f0d34e45..18393b66 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -69,6 +69,9 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { chunksCreatedTotal.Inc() } + // Don't fail on the first append error - if samples are sent out of order, + // we still want to append the later ones. + var appendErr error for i := range entries { if s.chunks[0].closed || !s.chunks[0].chunk.SpaceFor(&entries[i]) { samplesPerChunk.Observe(float64(s.chunks[0].chunk.Size())) @@ -78,11 +81,11 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { chunksCreatedTotal.Inc() } if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil { - return err + appendErr = err } } - return nil + return appendErr } // Returns an iterator. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index e215b3ad..ce08d4b0 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -62,6 +62,7 @@ type Loki struct { server *server.Server ring *ring.Ring + overrides *validation.Overrides distributor *distributor.Distributor ingester *ingester.Ingester querier *querier.Querier diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 81ea6f0a..ec2c0e2f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -26,6 +26,7 @@ type moduleName int // The various modules that make up Loki. const ( Ring moduleName = iota + Overrides Server Distributor Ingester @@ -38,6 +39,8 @@ func (m moduleName) String() string { switch m { case Ring: return "ring" + case Overrides: + return "overrides" case Server: return "server" case Distributor: @@ -60,6 +63,9 @@ func (m *moduleName) Set(s string) error { case "ring": *m = Ring return nil + case "overrides": + *m = Overrides + return nil case "server": *m = Server return nil @@ -97,8 +103,13 @@ func (t *Loki) initRing() (err error) { return } +func (t *Loki) initOverrides() (err error) { + t.overrides, err = validation.NewOverrides(t.cfg.LimitsConfig) + return err +} + func (t *Loki) initDistributor() (err error) { - t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring) + t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides) if err != nil { return } @@ -158,13 +169,7 @@ func (t *Loki) stopIngester() error { } func (t *Loki) initStore() (err error) { - var overrides *validation.Overrides - overrides, err = validation.NewOverrides(t.cfg.LimitsConfig) - if err != nil { - return err - } - - t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, overrides) + t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) return } @@ -189,12 +194,17 @@ var modules = map[moduleName]module{ init: (*Loki).initRing, }, + Overrides: { + init: (*Loki).initOverrides, + }, + Distributor: { - deps: []moduleName{Ring, Server}, + deps: []moduleName{Ring, Server, Overrides}, init: (*Loki).initDistributor, }, Store: { + deps: []moduleName{Overrides}, init: (*Loki).initStore, stop: (*Loki).stopStore, }, diff --git a/pkg/util/conv.go b/pkg/util/conv.go new file mode 100644 index 00000000..b44bed5c --- /dev/null +++ b/pkg/util/conv.go @@ -0,0 +1,24 @@ +package util + +import ( + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/wire" + "github.com/grafana/loki/pkg/parser" +) + +// ToClientLabels parses the labels and converts them to the Cortex type. +func ToClientLabels(labels string) ([]client.LabelPair, error) { + ls, err := parser.Labels(labels) + if err != nil { + return nil, err + } + + pairs := make([]client.LabelPair, 0, len(ls)) + for i := 0; i < len(ls); i++ { + pairs = append(pairs, client.LabelPair{ + Name: wire.Bytes(ls[i].Name), + Value: wire.Bytes(ls[i].Value), + }) + } + return pairs, nil +} -- GitLab