diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d50186c35667b572cb813e67add1f0f8536b07b4..9e6bb9bd3d8ff2391db1820b062904d394f0d01f 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -86,7 +86,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } lastChunk := stream.chunks[len(stream.chunks)-1] - if time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && len(stream.chunks) <= 1 && !immediate { + if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate { return } @@ -163,39 +163,31 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint return nil, nil } - // Flush a chunk after it received no samples for a long time. - if len(stream.chunks) == 1 { - chunk := &stream.chunks[0] - - if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle && - chunk.flushed.IsZero() { - chunk.closed = true - - return []*chunkDesc{chunk}, stream.labels + var result []*chunkDesc + for j := range stream.chunks { + if immediate || i.shouldFlushChunk(&stream.chunks[j]) { + result = append(result, &stream.chunks[j]) } } + return result, stream.labels +} - if len(stream.chunks) < 2 && !immediate { - return nil, nil +func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { + if !chunk.flushed.IsZero() { + return false } - var chunks []*chunkDesc - lastIndex := len(stream.chunks) - if !immediate { - lastIndex-- + // Append should close the chunk when the a new one is added. + if chunk.closed { + return true } - for i := 0; i < lastIndex; i++ { - // Ensure no more writes happen to this chunk. - if !stream.chunks[i].closed { - stream.chunks[i].closed = true - } - // Flush this chunk if it hasn't already been successfully flushed. - if stream.chunks[i].flushed.IsZero() { - chunks = append(chunks, &stream.chunks[i]) - } + + if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { + chunk.closed = true + return true } - return chunks, stream.labels + return false } func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 1b497f6e6fa62b01d29bc13e911f81aa5cd7aa05..e8348990e705ed3a0878c8463cf92a45d091fe35 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -2,6 +2,7 @@ package ingester import ( "fmt" + "os" "sort" "sync" "testing" @@ -9,7 +10,9 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" @@ -18,6 +21,29 @@ import ( "golang.org/x/net/context" ) +const ( + numSeries = 10 + samplesPerSeries = 100 +) + +func init() { + util.Logger = log.NewLogfmtLogger(os.Stdout) +} + +func TestChunkFlushingIdle(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.FlushCheckPeriod = 20 * time.Millisecond + cfg.MaxChunkIdle = 100 * time.Millisecond + cfg.RetainPeriod = 500 * time.Millisecond + + store, ing := newTestStore(t, cfg) + userIDs, testData := pushTestSamples(t, ing) + + // wait beyond idle time so samples flush + time.Sleep(cfg.MaxChunkIdle * 2) + store.checkData(t, userIDs, testData) +} + type testStore struct { mtx sync.Mutex // Chunks keyed by userID. @@ -36,9 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { } func newDefaultTestStore(t require.TestingT) (*testStore, *Ingester) { - return newTestStore(t, - defaultIngesterTestConfig(), - ) + return newTestStore(t, defaultIngesterTestConfig()) } func defaultIngesterTestConfig() Config { @@ -77,59 +101,36 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { func (s *testStore) Stop() {} -// check that the store is holding data equivalent to what we expect -func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) { - s.mtx.Lock() - defer s.mtx.Unlock() - for _, userID := range userIDs { - chunks := make([]chunkenc.Chunk, 0, len(s.chunks[userID])) - labels := make([]string, 0, len(s.chunks[userID])) - for _, chk := range s.chunks[userID] { - chunks = append(chunks, chk.Data.(*chunkenc.Facade).LokiChunk()) - - delete(chk.Metric, nameLabel) - labels = append(labels, chk.Metric.String()) - } - - streams := make([]*logproto.Stream, 0, len(chunks)) - for i, chk := range chunks { - stream := buildStreamsFromChunk(t, labels[i], chk) - streams = append(streams, stream) - } - sort.Slice(streams, func(i, j int) bool { - return streams[i].Labels < streams[j].Labels - }) +func pushTestSamples(t *testing.T, ing *Ingester) ([]string, map[string][]*logproto.Stream) { + userIDs := []string{"1", "2", "3"} - require.Equal(t, testData[userID], streams) + // Create test samples. + testData := map[string][]*logproto.Stream{} + for i, userID := range userIDs { + testData[userID] = buildTestStreams(i) } -} - -func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { - //start, end := chk.Bounds() - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD) - require.NoError(t, err) - stream := &logproto.Stream{} - stream.Labels = labels - for it.Next() { - stream.Entries = append(stream.Entries, it.Entry()) + // Append samples. + for _, userID := range userIDs { + ctx := user.InjectOrgID(context.Background(), userID) + _, err := ing.Push(ctx, &logproto.PushRequest{ + Streams: testData[userID], + }) + require.NoError(t, err) } - require.NoError(t, it.Error()) - - return stream + return userIDs, testData } -func buildTestStreams(numSeries int, linesPerSeries int, offset int) []*logproto.Stream { - m := make([]*logproto.Stream, 0, numSeries) +func buildTestStreams(offset int) []*logproto.Stream { + var m []*logproto.Stream for i := 0; i < numSeries; i++ { ss := logproto.Stream{ Labels: model.Metric{ "name": model.LabelValue(fmt.Sprintf("testmetric_%d", i)), model.JobLabel: "testjob", }.String(), - Entries: make([]logproto.Entry, 0, linesPerSeries), } - for j := 0; j < linesPerSeries; j++ { + for j := 0; j < samplesPerSeries; j++ { ss.Entries = append(ss.Entries, logproto.Entry{ Timestamp: time.Unix(int64(i+j+offset), 0), Line: "line", @@ -145,39 +146,36 @@ func buildTestStreams(numSeries int, linesPerSeries int, offset int) []*logproto return m } -func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries int) ([]string, map[string][]*logproto.Stream) { - userIDs := []string{"1", "2", "3"} - - // Create test samples. - testData := map[string][]*logproto.Stream{} - for i, userID := range userIDs { - testData[userID] = buildTestStreams(numSeries, samplesPerSeries, i) - } - - // Append samples. +// check that the store is holding data equivalent to what we expect +func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) { + s.mtx.Lock() + defer s.mtx.Unlock() for _, userID := range userIDs { - ctx := user.InjectOrgID(context.Background(), userID) - _, err := ing.Push(ctx, &logproto.PushRequest{ - Streams: testData[userID], + chunks := s.chunks[userID] + streams := make([]*logproto.Stream, 0, len(chunks)) + for _, chunk := range chunks { + lokiChunk := chunk.Data.(*chunkenc.Facade).LokiChunk() + delete(chunk.Metric, nameLabel) + labels := chunk.Metric.String() + streams = append(streams, buildStreamsFromChunk(t, labels, lokiChunk)) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].Labels < streams[j].Labels }) - require.NoError(t, err) + require.Equal(t, testData[userID], streams) } - return userIDs, testData } -func TestChunkFlushingIdle(t *testing.T) { - cfg := defaultIngesterTestConfig() - cfg.FlushCheckPeriod = 20 * time.Millisecond - cfg.MaxChunkIdle = 100 * time.Millisecond - cfg.RetainPeriod = 500 * time.Millisecond - - store, ing := newTestStore(t, cfg) - - userIDs, testData := pushTestSamples(t, ing, 4, 100) - - // wait beyond idle time so samples flush - time.Sleep(cfg.MaxChunkIdle * 2) - - store.checkData(t, userIDs, testData) +func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD) + require.NoError(t, err) + stream := &logproto.Stream{ + Labels: labels, + } + for it.Next() { + stream.Entries = append(stream.Entries, it.Entry()) + } + require.NoError(t, it.Error()) + return stream } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e678f3e819812b6daadfb2f7b8fd50e55c6a3e5f..9fd5b88739019abdc089dc31e4b984daee1514a2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -71,10 +71,6 @@ type ChunkStore interface { // New makes a new Ingester. func New(cfg Config, store ChunkStore) (*Ingester, error) { - if cfg.MaxChunkIdle == 0 { - cfg.MaxChunkIdle = 30 * time.Minute - } - i := &Ingester{ cfg: cfg, instances: map[string]*instance{}, diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index b0598c826d512b4aba68e0c867edeaeffac782a7..4ce05c34d0ec97e3b9c64c3259b4c37bbfe853ed 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -85,7 +85,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil { appendErr = err } - s.chunks[len(s.chunks)-1].lastUpdated = time.Now() + s.chunks[len(s.chunks)-1].lastUpdated = entries[i].Timestamp } return appendErr