diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index ad31034e77031776add284e06c392a31046201aa..24d7a8344a5286d418473cecbe6f0495469fb398 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -9,6 +9,7 @@ ingester: ring: store: inmemory replication_factor: 1 + chunk_idle_period: 15m schema_config: configs: @@ -28,4 +29,4 @@ storage_config: directory: /tmp/loki/chunks limits_config: - enforce_metric_name: false \ No newline at end of file + enforce_metric_name: false diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 8e6b8aab3201a03cc6cda0f05bf6d33015b89943..9e6bb9bd3d8ff2391db1820b062904d394f0d01f 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -81,7 +81,12 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { - if len(stream.chunks) <= 1 && !immediate { + if len(stream.chunks) == 0 { + return + } + + lastChunk := stream.chunks[len(stream.chunks)-1] + if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate { return } @@ -158,27 +163,31 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint return nil, nil } - if len(stream.chunks) < 2 && !immediate { - return nil, nil + var result []*chunkDesc + for j := range stream.chunks { + if immediate || i.shouldFlushChunk(&stream.chunks[j]) { + result = append(result, &stream.chunks[j]) + } } + return result, stream.labels +} - var chunks []*chunkDesc - lastIndex := len(stream.chunks) - if !immediate { - lastIndex-- +func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { + if !chunk.flushed.IsZero() { + return false } - for i := 0; i < lastIndex; i++ { - // Ensure no more writes happen to this chunk. - if !stream.chunks[i].closed { - stream.chunks[i].closed = true - } - // Flush this chunk if it hasn't already been successfully flushed. - if stream.chunks[i].flushed.IsZero() { - chunks = append(chunks, &stream.chunks[i]) - } + + // Append should close the chunk when the a new one is added. + if chunk.closed { + return true + } + + if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { + chunk.closed = true + return true } - return chunks, stream.labels + return false } func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3f15fa3a516bb5aca04990bb01406be56a80369d --- /dev/null +++ b/pkg/ingester/flush_test.go @@ -0,0 +1,188 @@ +package ingester + +import ( + "fmt" + "os" + "sort" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" +) + +const ( + numSeries = 10 + samplesPerSeries = 100 +) + +func init() { + util.Logger = log.NewLogfmtLogger(os.Stdout) +} + +func TestChunkFlushingIdle(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.FlushCheckPeriod = 20 * time.Millisecond + cfg.MaxChunkIdle = 100 * time.Millisecond + cfg.RetainPeriod = 500 * time.Millisecond + + store, ing := newTestStore(t, cfg) + userIDs, testData := pushTestSamples(t, ing) + + // wait beyond idle time so samples flush + time.Sleep(cfg.MaxChunkIdle * 2) + store.checkData(t, userIDs, testData) +} + +func TestChunkFlushingShutdown(t *testing.T) { + store, ing := newTestStore(t, defaultIngesterTestConfig()) + userIDs, testData := pushTestSamples(t, ing) + ing.Shutdown() + store.checkData(t, userIDs, testData) +} + +type testStore struct { + mtx sync.Mutex + // Chunks keyed by userID. + chunks map[string][]chunk.Chunk +} + +func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { + store := &testStore{ + chunks: map[string][]chunk.Chunk{}, + } + + ing, err := New(cfg, store) + require.NoError(t, err) + + return store, ing +} + +func newDefaultTestStore(t require.TestingT) (*testStore, *Ingester) { + return newTestStore(t, defaultIngesterTestConfig()) +} + +func defaultIngesterTestConfig() Config { + consul := ring.NewInMemoryKVClient() + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.FlushCheckPeriod = 99999 * time.Hour + cfg.MaxChunkIdle = 99999 * time.Hour + cfg.ConcurrentFlushes = 1 + cfg.LifecyclerConfig.RingConfig.Mock = consul + cfg.LifecyclerConfig.NumTokens = 1 + cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) + cfg.LifecyclerConfig.Addr = "localhost" + cfg.LifecyclerConfig.ID = "localhost" + return cfg +} + +func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + for _, chunk := range chunks { + for k, v := range chunk.Metric { + if v == "" { + return fmt.Errorf("Chunk has blank label %q", k) + } + } + } + s.chunks[userID] = append(s.chunks[userID], chunks...) + return nil +} + +func (s *testStore) Stop() {} + +func pushTestSamples(t *testing.T, ing *Ingester) ([]string, map[string][]*logproto.Stream) { + userIDs := []string{"1", "2", "3"} + + // Create test samples. + testData := map[string][]*logproto.Stream{} + for i, userID := range userIDs { + testData[userID] = buildTestStreams(i) + } + + // Append samples. + for _, userID := range userIDs { + ctx := user.InjectOrgID(context.Background(), userID) + _, err := ing.Push(ctx, &logproto.PushRequest{ + Streams: testData[userID], + }) + require.NoError(t, err) + } + return userIDs, testData +} + +func buildTestStreams(offset int) []*logproto.Stream { + var m []*logproto.Stream + for i := 0; i < numSeries; i++ { + ss := logproto.Stream{ + Labels: model.Metric{ + "name": model.LabelValue(fmt.Sprintf("testmetric_%d", i)), + model.JobLabel: "testjob", + }.String(), + } + for j := 0; j < samplesPerSeries; j++ { + ss.Entries = append(ss.Entries, logproto.Entry{ + Timestamp: time.Unix(int64(i+j+offset), 0), + Line: "line", + }) + } + m = append(m, &ss) + } + + sort.Slice(m, func(i, j int) bool { + return m[i].Labels < m[j].Labels + }) + + return m +} + +// check that the store is holding data equivalent to what we expect +func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) { + s.mtx.Lock() + defer s.mtx.Unlock() + for _, userID := range userIDs { + chunks := s.chunks[userID] + streams := []*logproto.Stream{} + for _, chunk := range chunks { + lokiChunk := chunk.Data.(*chunkenc.Facade).LokiChunk() + delete(chunk.Metric, nameLabel) + labels := chunk.Metric.String() + streams = append(streams, buildStreamsFromChunk(t, labels, lokiChunk)) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].Labels < streams[j].Labels + }) + require.Equal(t, testData[userID], streams) + } +} + +func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD) + require.NoError(t, err) + + stream := &logproto.Stream{ + Labels: labels, + } + for it.Next() { + stream.Entries = append(stream.Entries, it.Entry()) + } + require.NoError(t, it.Error()) + return stream +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index aff429a3c3d452a35784cdafa33ebbe7425114a3..9fd5b88739019abdc089dc31e4b984daee1514a2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -31,6 +31,7 @@ type Config struct { FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` + MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` } // RegisterFlags registers the flags. @@ -41,6 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "") } // Ingester builds chunks for incoming log streams. @@ -51,7 +53,7 @@ type Ingester struct { instances map[string]*instance lifecycler *ring.Lifecycler - store chunk.Store + store ChunkStore done sync.WaitGroup quit chan struct{} @@ -62,8 +64,13 @@ type Ingester struct { flushQueuesDone sync.WaitGroup } +// ChunkStore is the interface we need to store chunks. +type ChunkStore interface { + Put(ctx context.Context, chunks []chunk.Chunk) error +} + // New makes a new Ingester. -func New(cfg Config, store chunk.Store) (*Ingester, error) { +func New(cfg Config, store ChunkStore) (*Ingester, error) { i := &Ingester{ cfg: cfg, instances: map[string]*instance{}, diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 18393b664974f71aa2a5208a0c4c9ddc47923025..35c6d539a7ec4af26e4cbcc3d0eed4907eab9f7b 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -52,6 +52,8 @@ type chunkDesc struct { chunk chunkenc.Chunk closed bool flushed time.Time + + lastUpdated time.Time } func newStream(fp model.Fingerprint, labels []client.LabelPair) *stream { @@ -73,16 +75,22 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { // we still want to append the later ones. var appendErr error for i := range entries { - if s.chunks[0].closed || !s.chunks[0].chunk.SpaceFor(&entries[i]) { - samplesPerChunk.Observe(float64(s.chunks[0].chunk.Size())) + chunk := &s.chunks[len(s.chunks)-1] + if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) { + chunk.closed = true + + samplesPerChunk.Observe(float64(chunk.chunk.Size())) + chunksCreatedTotal.Inc() + s.chunks = append(s.chunks, chunkDesc{ chunk: chunkenc.NewMemChunk(chunkenc.EncGZIP), }) - chunksCreatedTotal.Inc() + chunk = &s.chunks[len(s.chunks)-1] } - if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil { + if err := chunk.chunk.Append(&entries[i]); err != nil { appendErr = err } + chunk.lastUpdated = entries[i].Timestamp } return appendErr diff --git a/production/helm/templates/loki/configmap.yaml b/production/helm/templates/loki/configmap.yaml index f7eba41fd498a9053d574d790dd3ba1cae94db57..2eff8df8131e6ec889a16bace472e7ac3b45e4f6 100644 --- a/production/helm/templates/loki/configmap.yaml +++ b/production/helm/templates/loki/configmap.yaml @@ -22,6 +22,7 @@ data: ring: store: {{ .Values.loki.config.ingester.lifecycler.ring.store }} replication_factor: {{ .Values.loki.config.ingester.lifecycler.ring.replication_factor }} + chunk_idle_period: 15m {{- if .Values.loki.config.schema_configs }} schema_config: diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index 081f8c11ad4f46953af28cdb1671096b228ccdc0..2f3481b5b4ce9a092470412b7c905f8ae476c8fd 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -48,6 +48,8 @@ }, }, + chunk_idle_period: '15m', + num_tokens: 512, heartbeat_period: '5s', join_after: '10s',