diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 7bdeae54933df725d705f838b4e1d8782c7258f2..d056d25d042a4f42c972f41b57d98d2c3ea381bc 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -196,17 +196,20 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint var result []*chunkDesc for j := range stream.chunks { if immediate || i.shouldFlushChunk(&stream.chunks[j]) { - result = append(result, &stream.chunks[j]) + // Ensure no more writes happen to this chunk. + if !stream.chunks[j].closed { + stream.chunks[j].closed = true + } + // Flush this chunk if it hasn't already been successfully flushed. + if stream.chunks[j].flushed.IsZero() { + result = append(result, &stream.chunks[j]) + } } } return result, stream.labels } func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { - if !chunk.flushed.IsZero() { - return false - } - // Append should close the chunk when the a new one is added. if chunk.closed { return true