diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index e22f0045f3fa8486c7b659b16ef187b66ac72052..d50186c35667b572cb813e67add1f0f8536b07b4 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -81,16 +81,12 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { - old := false - if len(stream.chunks) == 1 { - chunk := stream.chunks[0] - if i.cfg.MaxChunkIdle > 0 && - time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { - old = true - } + if len(stream.chunks) == 0 { + return } - if !old && len(stream.chunks) <= 1 && !immediate { + lastChunk := stream.chunks[len(stream.chunks)-1] + if time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && len(stream.chunks) <= 1 && !immediate { return } @@ -171,13 +167,11 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint if len(stream.chunks) == 1 { chunk := &stream.chunks[0] - if i.cfg.MaxChunkIdle > 0 && - time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { + if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle && + chunk.flushed.IsZero() { chunk.closed = true - if chunk.flushed.IsZero() { - return []*chunkDesc{chunk}, stream.labels - } + return []*chunkDesc{chunk}, stream.labels } } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 2af46ffa192b60b7000890086ab7da8409ff9cac..1b497f6e6fa62b01d29bc13e911f81aa5cd7aa05 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -97,7 +97,7 @@ func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[strin streams = append(streams, stream) } sort.Slice(streams, func(i, j int) bool { - return streams[i].Labels < streams[i].Labels + return streams[i].Labels < streams[j].Labels }) require.Equal(t, testData[userID], streams) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9fd5b88739019abdc089dc31e4b984daee1514a2..e678f3e819812b6daadfb2f7b8fd50e55c6a3e5f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -71,6 +71,10 @@ type ChunkStore interface { // New makes a new Ingester. func New(cfg Config, store ChunkStore) (*Ingester, error) { + if cfg.MaxChunkIdle == 0 { + cfg.MaxChunkIdle = 30 * time.Minute + } + i := &Ingester{ cfg: cfg, instances: map[string]*instance{},