diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index ad31034e77031776add284e06c392a31046201aa..24d7a8344a5286d418473cecbe6f0495469fb398 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -9,6 +9,7 @@ ingester: ring: store: inmemory replication_factor: 1 + chunk_idle_period: 15m schema_config: configs: @@ -28,4 +29,4 @@ storage_config: directory: /tmp/loki/chunks limits_config: - enforce_metric_name: false \ No newline at end of file + enforce_metric_name: false diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 8e6b8aab3201a03cc6cda0f05bf6d33015b89943..e22f0045f3fa8486c7b659b16ef187b66ac72052 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -81,7 +81,16 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) { } func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate bool) { - if len(stream.chunks) <= 1 && !immediate { + old := false + if len(stream.chunks) == 1 { + chunk := stream.chunks[0] + if i.cfg.MaxChunkIdle > 0 && + time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { + old = true + } + } + + if !old && len(stream.chunks) <= 1 && !immediate { return } @@ -158,6 +167,20 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint return nil, nil } + // Flush a chunk after it received no samples for a long time. + if len(stream.chunks) == 1 { + chunk := &stream.chunks[0] + + if i.cfg.MaxChunkIdle > 0 && + time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle { + chunk.closed = true + + if chunk.flushed.IsZero() { + return []*chunkDesc{chunk}, stream.labels + } + } + } + if len(stream.chunks) < 2 && !immediate { return nil, nil } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2af46ffa192b60b7000890086ab7da8409ff9cac --- /dev/null +++ b/pkg/ingester/flush_test.go @@ -0,0 +1,183 @@ +package ingester + +import ( + "fmt" + "sort" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" +) + +type testStore struct { + mtx sync.Mutex + // Chunks keyed by userID. + chunks map[string][]chunk.Chunk +} + +func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { + store := &testStore{ + chunks: map[string][]chunk.Chunk{}, + } + + ing, err := New(cfg, store) + require.NoError(t, err) + + return store, ing +} + +func newDefaultTestStore(t require.TestingT) (*testStore, *Ingester) { + return newTestStore(t, + defaultIngesterTestConfig(), + ) +} + +func defaultIngesterTestConfig() Config { + consul := ring.NewInMemoryKVClient() + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.FlushCheckPeriod = 99999 * time.Hour + cfg.MaxChunkIdle = 99999 * time.Hour + cfg.ConcurrentFlushes = 1 + cfg.LifecyclerConfig.RingConfig.Mock = consul + cfg.LifecyclerConfig.NumTokens = 1 + cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) + cfg.LifecyclerConfig.Addr = "localhost" + cfg.LifecyclerConfig.ID = "localhost" + return cfg +} + +func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + for _, chunk := range chunks { + for k, v := range chunk.Metric { + if v == "" { + return fmt.Errorf("Chunk has blank label %q", k) + } + } + } + s.chunks[userID] = append(s.chunks[userID], chunks...) + return nil +} + +func (s *testStore) Stop() {} + +// check that the store is holding data equivalent to what we expect +func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) { + s.mtx.Lock() + defer s.mtx.Unlock() + for _, userID := range userIDs { + chunks := make([]chunkenc.Chunk, 0, len(s.chunks[userID])) + labels := make([]string, 0, len(s.chunks[userID])) + for _, chk := range s.chunks[userID] { + chunks = append(chunks, chk.Data.(*chunkenc.Facade).LokiChunk()) + + delete(chk.Metric, nameLabel) + labels = append(labels, chk.Metric.String()) + } + + streams := make([]*logproto.Stream, 0, len(chunks)) + for i, chk := range chunks { + stream := buildStreamsFromChunk(t, labels[i], chk) + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { + return streams[i].Labels < streams[i].Labels + }) + + require.Equal(t, testData[userID], streams) + } +} + +func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { + //start, end := chk.Bounds() + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD) + require.NoError(t, err) + + stream := &logproto.Stream{} + stream.Labels = labels + for it.Next() { + stream.Entries = append(stream.Entries, it.Entry()) + } + require.NoError(t, it.Error()) + + return stream +} + +func buildTestStreams(numSeries int, linesPerSeries int, offset int) []*logproto.Stream { + m := make([]*logproto.Stream, 0, numSeries) + for i := 0; i < numSeries; i++ { + ss := logproto.Stream{ + Labels: model.Metric{ + "name": model.LabelValue(fmt.Sprintf("testmetric_%d", i)), + model.JobLabel: "testjob", + }.String(), + Entries: make([]logproto.Entry, 0, linesPerSeries), + } + for j := 0; j < linesPerSeries; j++ { + ss.Entries = append(ss.Entries, logproto.Entry{ + Timestamp: time.Unix(int64(i+j+offset), 0), + Line: "line", + }) + } + m = append(m, &ss) + } + + sort.Slice(m, func(i, j int) bool { + return m[i].Labels < m[j].Labels + }) + + return m +} + +func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries int) ([]string, map[string][]*logproto.Stream) { + userIDs := []string{"1", "2", "3"} + + // Create test samples. + testData := map[string][]*logproto.Stream{} + for i, userID := range userIDs { + testData[userID] = buildTestStreams(numSeries, samplesPerSeries, i) + } + + // Append samples. + for _, userID := range userIDs { + ctx := user.InjectOrgID(context.Background(), userID) + _, err := ing.Push(ctx, &logproto.PushRequest{ + Streams: testData[userID], + }) + require.NoError(t, err) + } + return userIDs, testData +} + +func TestChunkFlushingIdle(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.FlushCheckPeriod = 20 * time.Millisecond + cfg.MaxChunkIdle = 100 * time.Millisecond + cfg.RetainPeriod = 500 * time.Millisecond + + store, ing := newTestStore(t, cfg) + + userIDs, testData := pushTestSamples(t, ing, 4, 100) + + // wait beyond idle time so samples flush + time.Sleep(cfg.MaxChunkIdle * 2) + + store.checkData(t, userIDs, testData) + +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index aff429a3c3d452a35784cdafa33ebbe7425114a3..9fd5b88739019abdc089dc31e4b984daee1514a2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -31,6 +31,7 @@ type Config struct { FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` + MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` } // RegisterFlags registers the flags. @@ -41,6 +42,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "") } // Ingester builds chunks for incoming log streams. @@ -51,7 +53,7 @@ type Ingester struct { instances map[string]*instance lifecycler *ring.Lifecycler - store chunk.Store + store ChunkStore done sync.WaitGroup quit chan struct{} @@ -62,8 +64,13 @@ type Ingester struct { flushQueuesDone sync.WaitGroup } +// ChunkStore is the interface we need to store chunks. +type ChunkStore interface { + Put(ctx context.Context, chunks []chunk.Chunk) error +} + // New makes a new Ingester. -func New(cfg Config, store chunk.Store) (*Ingester, error) { +func New(cfg Config, store ChunkStore) (*Ingester, error) { i := &Ingester{ cfg: cfg, instances: map[string]*instance{}, diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 18393b664974f71aa2a5208a0c4c9ddc47923025..b0598c826d512b4aba68e0c867edeaeffac782a7 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -52,6 +52,8 @@ type chunkDesc struct { chunk chunkenc.Chunk closed bool flushed time.Time + + lastUpdated time.Time } func newStream(fp model.Fingerprint, labels []client.LabelPair) *stream { @@ -83,6 +85,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil { appendErr = err } + s.chunks[len(s.chunks)-1].lastUpdated = time.Now() } return appendErr