diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 30f05c7ee382fcca6c5b3454f52ef5c2bfc12982..d3e0689a13da4039a62954bcf7efa8aba31f732b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -47,6 +47,8 @@ var ( // Config for a Distributor. type Config struct { + // For testing. + factory func(addr string) (grpc_health_v1.HealthClient, error) } // RegisterFlags registers the flags. @@ -64,8 +66,11 @@ type Distributor struct { // New a distributor creates. func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) { - factory := func(addr string) (grpc_health_v1.HealthClient, error) { - return client.New(clientCfg, addr) + factory := cfg.factory + if factory == nil { + factory = func(addr string) (grpc_health_v1.HealthClient, error) { + return client.New(clientCfg, addr) + } } return &Distributor{ @@ -132,7 +137,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } if len(streams) == 0 { - return &logproto.PushResponse{}, nil + return &logproto.PushResponse{}, validationErr } replicationSets, err := d.ring.BatchGet(keys, ring.Write) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4cc926157a8fbc4b5f997137a331ddaff14df6ac --- /dev/null +++ b/pkg/distributor/distributor_test.go @@ -0,0 +1,125 @@ +package distributor + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" +) + +const numIngesters = 5 + +func TestDistributor(t *testing.T) { + var ( + distributorConfig Config + defaultLimits validation.Limits + clientConfig client.Config + ) + flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig) + defaultLimits.EnforceMetricName = false + + limits, err := validation.NewOverrides(defaultLimits) + require.NoError(t, err) + + ingesters := map[string]*mockIngester{} + for i := 0; i < numIngesters; i++ { + ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{} + } + + r := &mockRing{ + replicationFactor: 3, + } + for addr := range ingesters { + r.ingesters = append(r.ingesters, ring.IngesterDesc{ + Addr: addr, + }) + } + + distributorConfig.factory = func(addr string) (grpc_health_v1.HealthClient, error) { + return ingesters[addr], nil + } + + d, err := New(distributorConfig, clientConfig, r, limits) + require.NoError(t, err) + + req := logproto.PushRequest{ + Streams: []*logproto.Stream{ + { + Labels: `{foo="bar"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req) + require.NoError(t, err) +} + +type mockIngester struct { + grpc_health_v1.HealthClient + logproto.PusherClient +} + +func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) { + return nil, nil +} + +// Copied from Cortex; TODO(twilkie) - factor this our and share it. +// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor +// ingesters. +type mockRing struct { + prometheus.Counter + ingesters []ring.IngesterDesc + replicationFactor uint32 +} + +func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) { + result := ring.ReplicationSet{ + MaxErrors: 1, + } + for i := uint32(0); i < r.replicationFactor; i++ { + n := (key + i) % uint32(len(r.ingesters)) + result.Ingesters = append(result.Ingesters, r.ingesters[n]) + } + return result, nil +} + +func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) { + result := []ring.ReplicationSet{} + for i := 0; i < len(keys); i++ { + rs, err := r.Get(keys[i], op) + if err != nil { + return nil, err + } + result = append(result, rs) + } + return result, nil +} + +func (r mockRing) GetAll() (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Ingesters: r.ingesters, + MaxErrors: 1, + }, nil +} + +func (r mockRing) ReplicationFactor() int { + return int(r.replicationFactor) +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2c6ca1b6de377dfc69d542bc9db707aa57401e26 --- /dev/null +++ b/pkg/ingester/ingester_test.go @@ -0,0 +1,70 @@ +package ingester + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/flagext" +) + +func TestIngester(t *testing.T) { + var ingesterConfig Config + flagext.DefaultValues(&ingesterConfig) + ingesterConfig.LifecyclerConfig.RingConfig.Mock = ring.NewInMemoryKVClient() + ingesterConfig.LifecyclerConfig.NumTokens = 1 + ingesterConfig.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) + ingesterConfig.LifecyclerConfig.Addr = "localhost" + ingesterConfig.LifecyclerConfig.ID = "localhost" + + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + i, err := New(ingesterConfig, store) + require.NoError(t, err) + defer i.Shutdown() + + req := logproto.PushRequest{ + Streams: []*logproto.Stream{ + { + Labels: `{foo="bar"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = i.Push(user.InjectOrgID(context.Background(), "test"), &req) + require.NoError(t, err) +} + +type mockStore struct { + mtx sync.Mutex + chunks map[string][]chunk.Chunk +} + +func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + userid, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + s.chunks[userid] = append(s.chunks[userid], chunks...) + return nil +}