diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 8ec63fc92905eaad7397aa1af649e8b311c765a0..50082f54b61440261306a0ee57738e92076a14d0 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -3,8 +3,11 @@ package chunkenc import ( "errors" "io" + "net/http" "time" + "github.com/weaveworks/common/httpgrpc" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" ) @@ -12,7 +15,7 @@ import ( // Errors returned by the chunk interface. var ( ErrChunkFull = errors.New("Chunk full") - ErrOutOfOrder = errors.New("Entry out of order") + ErrOutOfOrder = httpgrpc.Errorf(http.StatusBadRequest, "Entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid checksum") diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e85409fe19f137b90f6801c537eeee8927b82fb8..bc140c31a6b215343ba6744d69709a654e4fbc95 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -8,7 +8,8 @@ import ( cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/util" ) var ( @@ -56,11 +58,12 @@ type Distributor struct { cfg Config clientCfg client.Config ring ring.ReadRing + overrides *validation.Overrides pool *cortex_client.Pool } // New a distributor creates. -func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, error) { +func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) { factory := func(addr string) (grpc_health_v1.HealthClient, error) { return client.New(clientCfg, addr) } @@ -69,7 +72,8 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, cfg: cfg, clientCfg: clientCfg, ring: ring, - pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger), + overrides: overrides, + pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger), }, nil } @@ -114,7 +118,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // We also work out the hash value at the same time. streams := make([]streamTracker, len(req.Streams)) keys := make([]uint32, 0, len(req.Streams)) + var validationErr error for i, stream := range req.Streams { + if err := d.validateLabels(userID, stream.Labels); err != nil { + validationErr = err + continue + } + keys = append(keys, tokenFor(userID, stream.Labels)) streams[i].stream = stream } @@ -160,10 +170,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log case err := <-tracker.err: return nil, err case <-tracker.done: - return &logproto.PushResponse{}, nil + return &logproto.PushResponse{}, validationErr } } +func (d *Distributor) validateLabels(userID, labels string) error { + ls, err := util.ToClientLabels(labels) + if err != nil { + return err + } + + return d.overrides.ValidateLabels(userID, ls) +} + // TODO taken from Cortex, see if we can refactor out an usable interface. func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { err := d.sendSamplesErr(ctx, ingester, streamTrackers) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index d5e405afdf02498185d42a291ea1919384d58402..502fee4c019ca5e874ea14f4264618a8bdb74460 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -12,13 +12,13 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" - "github.com/cortexproject/cortex/pkg/util/wire" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/parser" "github.com/grafana/loki/pkg/querier" + "github.com/grafana/loki/pkg/util" ) const queryBatchSize = 128 @@ -67,10 +67,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { i.streamsMtx.Lock() defer i.streamsMtx.Unlock() + var appendErr error for _, s := range req.Streams { - labels, err := toClientLabels(s.Labels) + labels, err := util.ToClientLabels(s.Labels) if err != nil { - return err + appendErr = err + continue } fp := client.FastFingerprint(labels) @@ -83,11 +85,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { } if err := stream.Push(ctx, s.Entries); err != nil { - return err + appendErr = err + continue } } - return nil + return appendErr } func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { @@ -183,19 +186,3 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, } return nil } - -func toClientLabels(labels string) ([]client.LabelPair, error) { - ls, err := parser.Labels(labels) - if err != nil { - return nil, err - } - - pairs := make([]client.LabelPair, 0, len(ls)) - for i := 0; i < len(ls); i++ { - pairs = append(pairs, client.LabelPair{ - Name: wire.Bytes(ls[i].Name), - Value: wire.Bytes(ls[i].Value), - }) - } - return pairs, nil -} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index f0d34e4564cb31331b51767bfc7306814578ab77..18393b664974f71aa2a5208a0c4c9ddc47923025 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -69,6 +69,9 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { chunksCreatedTotal.Inc() } + // Don't fail on the first append error - if samples are sent out of order, + // we still want to append the later ones. + var appendErr error for i := range entries { if s.chunks[0].closed || !s.chunks[0].chunk.SpaceFor(&entries[i]) { samplesPerChunk.Observe(float64(s.chunks[0].chunk.Size())) @@ -78,11 +81,11 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { chunksCreatedTotal.Inc() } if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil { - return err + appendErr = err } } - return nil + return appendErr } // Returns an iterator. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index e215b3ad300395375ce28a5d2ee8ee8b3a8c21f1..ce08d4b071b5046edd40edbc36e1aeaf0267c572 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -62,6 +62,7 @@ type Loki struct { server *server.Server ring *ring.Ring + overrides *validation.Overrides distributor *distributor.Distributor ingester *ingester.Ingester querier *querier.Querier diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 81ea6f0abe0950c16901aca2f41a9af33e42292d..ec2c0e2f575788818ae8d71272e4cf3bcb63bb2b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -26,6 +26,7 @@ type moduleName int // The various modules that make up Loki. const ( Ring moduleName = iota + Overrides Server Distributor Ingester @@ -38,6 +39,8 @@ func (m moduleName) String() string { switch m { case Ring: return "ring" + case Overrides: + return "overrides" case Server: return "server" case Distributor: @@ -60,6 +63,9 @@ func (m *moduleName) Set(s string) error { case "ring": *m = Ring return nil + case "overrides": + *m = Overrides + return nil case "server": *m = Server return nil @@ -97,8 +103,13 @@ func (t *Loki) initRing() (err error) { return } +func (t *Loki) initOverrides() (err error) { + t.overrides, err = validation.NewOverrides(t.cfg.LimitsConfig) + return err +} + func (t *Loki) initDistributor() (err error) { - t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring) + t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides) if err != nil { return } @@ -158,13 +169,7 @@ func (t *Loki) stopIngester() error { } func (t *Loki) initStore() (err error) { - var overrides *validation.Overrides - overrides, err = validation.NewOverrides(t.cfg.LimitsConfig) - if err != nil { - return err - } - - t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, overrides) + t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) return } @@ -189,12 +194,17 @@ var modules = map[moduleName]module{ init: (*Loki).initRing, }, + Overrides: { + init: (*Loki).initOverrides, + }, + Distributor: { - deps: []moduleName{Ring, Server}, + deps: []moduleName{Ring, Server, Overrides}, init: (*Loki).initDistributor, }, Store: { + deps: []moduleName{Overrides}, init: (*Loki).initStore, stop: (*Loki).stopStore, }, diff --git a/pkg/util/conv.go b/pkg/util/conv.go new file mode 100644 index 0000000000000000000000000000000000000000..b44bed5ce77ac7eb86cc6733f18871252e77745a --- /dev/null +++ b/pkg/util/conv.go @@ -0,0 +1,24 @@ +package util + +import ( + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/wire" + "github.com/grafana/loki/pkg/parser" +) + +// ToClientLabels parses the labels and converts them to the Cortex type. +func ToClientLabels(labels string) ([]client.LabelPair, error) { + ls, err := parser.Labels(labels) + if err != nil { + return nil, err + } + + pairs := make([]client.LabelPair, 0, len(ls)) + for i := 0; i < len(ls); i++ { + pairs = append(pairs, client.LabelPair{ + Name: wire.Bytes(ls[i].Name), + Value: wire.Bytes(ls[i].Value), + }) + } + return pairs, nil +}