Skip to content
Snippets Groups Projects
Commit e1c5c4be authored by Tom Wilkie's avatar Tom Wilkie Committed by Tom Wilkie
Browse files

Basic distributor & ingester tests.


Proves that they can accept multiple samples with the same timestamp.

Signed-off-by: default avatarTom Wilkie <tom.wilkie@gmail.com>
parent 58d2d217
No related branches found
No related tags found
No related merge requests found
......@@ -47,6 +47,8 @@ var (
// Config for a Distributor.
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)
}
// RegisterFlags registers the flags.
......@@ -64,8 +66,11 @@ type Distributor struct {
// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
factory := cfg.factory
if factory == nil {
factory = func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
}
return &Distributor{
......@@ -132,7 +137,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
if len(streams) == 0 {
return &logproto.PushResponse{}, nil
return &logproto.PushResponse{}, validationErr
}
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
......
package distributor
import (
"context"
"fmt"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
)
const numIngesters = 5
func TestDistributor(t *testing.T) {
var (
distributorConfig Config
defaultLimits validation.Limits
clientConfig client.Config
)
flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig)
defaultLimits.EnforceMetricName = false
limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
ingesters := map[string]*mockIngester{}
for i := 0; i < numIngesters; i++ {
ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{}
}
r := &mockRing{
replicationFactor: 3,
}
for addr := range ingesters {
r.ingesters = append(r.ingesters, ring.IngesterDesc{
Addr: addr,
})
}
distributorConfig.factory = func(addr string) (grpc_health_v1.HealthClient, error) {
return ingesters[addr], nil
}
d, err := New(distributorConfig, clientConfig, r, limits)
require.NoError(t, err)
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}
_, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
}
type mockIngester struct {
grpc_health_v1.HealthClient
logproto.PusherClient
}
func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) {
return nil, nil
}
// Copied from Cortex; TODO(twilkie) - factor this our and share it.
// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor
// ingesters.
type mockRing struct {
prometheus.Counter
ingesters []ring.IngesterDesc
replicationFactor uint32
}
func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
result.Ingesters = append(result.Ingesters, r.ingesters[n])
}
return result, nil
}
func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
rs, err := r.Get(keys[i], op)
if err != nil {
return nil, err
}
result = append(result, rs)
}
return result, nil
}
func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
}, nil
}
func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}
package ingester
import (
"fmt"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
)
func TestIngester(t *testing.T) {
var ingesterConfig Config
flagext.DefaultValues(&ingesterConfig)
ingesterConfig.LifecyclerConfig.RingConfig.Mock = ring.NewInMemoryKVClient()
ingesterConfig.LifecyclerConfig.NumTokens = 1
ingesterConfig.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0)
ingesterConfig.LifecyclerConfig.Addr = "localhost"
ingesterConfig.LifecyclerConfig.ID = "localhost"
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, store)
require.NoError(t, err)
defer i.Shutdown()
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}
_, err = i.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
}
type mockStore struct {
mtx sync.Mutex
chunks map[string][]chunk.Chunk
}
func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()
userid, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
s.chunks[userid] = append(s.chunks[userid], chunks...)
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment