Skip to content
Snippets Groups Projects
Commit 53827766 authored by Tom Wilkie's avatar Tom Wilkie
Browse files

Review feedback.


Signed-off-by: default avatarTom Wilkie <tom.wilkie@gmail.com>
parent bae89a63
No related branches found
No related tags found
No related merge requests found
......@@ -86,7 +86,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}
lastChunk := stream.chunks[len(stream.chunks)-1]
if time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && len(stream.chunks) <= 1 && !immediate {
if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate {
return
}
......@@ -163,39 +163,31 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
return nil, nil
}
// Flush a chunk after it received no samples for a long time.
if len(stream.chunks) == 1 {
chunk := &stream.chunks[0]
if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle &&
chunk.flushed.IsZero() {
chunk.closed = true
return []*chunkDesc{chunk}, stream.labels
var result []*chunkDesc
for j := range stream.chunks {
if immediate || i.shouldFlushChunk(&stream.chunks[j]) {
result = append(result, &stream.chunks[j])
}
}
return result, stream.labels
}
if len(stream.chunks) < 2 && !immediate {
return nil, nil
func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
if !chunk.flushed.IsZero() {
return false
}
var chunks []*chunkDesc
lastIndex := len(stream.chunks)
if !immediate {
lastIndex--
// Append should close the chunk when the a new one is added.
if chunk.closed {
return true
}
for i := 0; i < lastIndex; i++ {
// Ensure no more writes happen to this chunk.
if !stream.chunks[i].closed {
stream.chunks[i].closed = true
}
// Flush this chunk if it hasn't already been successfully flushed.
if stream.chunks[i].flushed.IsZero() {
chunks = append(chunks, &stream.chunks[i])
}
if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {
chunk.closed = true
return true
}
return chunks, stream.labels
return false
}
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
......
......@@ -2,6 +2,7 @@ package ingester
import (
"fmt"
"os"
"sort"
"sync"
"testing"
......@@ -9,7 +10,9 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
......@@ -18,6 +21,29 @@ import (
"golang.org/x/net/context"
)
const (
numSeries = 10
samplesPerSeries = 100
)
func init() {
util.Logger = log.NewLogfmtLogger(os.Stdout)
}
func TestChunkFlushingIdle(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.FlushCheckPeriod = 20 * time.Millisecond
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg)
userIDs, testData := pushTestSamples(t, ing)
// wait beyond idle time so samples flush
time.Sleep(cfg.MaxChunkIdle * 2)
store.checkData(t, userIDs, testData)
}
type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.
......@@ -36,9 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
}
func newDefaultTestStore(t require.TestingT) (*testStore, *Ingester) {
return newTestStore(t,
defaultIngesterTestConfig(),
)
return newTestStore(t, defaultIngesterTestConfig())
}
func defaultIngesterTestConfig() Config {
......@@ -77,59 +101,36 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
func (s *testStore) Stop() {}
// check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, userID := range userIDs {
chunks := make([]chunkenc.Chunk, 0, len(s.chunks[userID]))
labels := make([]string, 0, len(s.chunks[userID]))
for _, chk := range s.chunks[userID] {
chunks = append(chunks, chk.Data.(*chunkenc.Facade).LokiChunk())
delete(chk.Metric, nameLabel)
labels = append(labels, chk.Metric.String())
}
streams := make([]*logproto.Stream, 0, len(chunks))
for i, chk := range chunks {
stream := buildStreamsFromChunk(t, labels[i], chk)
streams = append(streams, stream)
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
func pushTestSamples(t *testing.T, ing *Ingester) ([]string, map[string][]*logproto.Stream) {
userIDs := []string{"1", "2", "3"}
require.Equal(t, testData[userID], streams)
// Create test samples.
testData := map[string][]*logproto.Stream{}
for i, userID := range userIDs {
testData[userID] = buildTestStreams(i)
}
}
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
//start, end := chk.Bounds()
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD)
require.NoError(t, err)
stream := &logproto.Stream{}
stream.Labels = labels
for it.Next() {
stream.Entries = append(stream.Entries, it.Entry())
// Append samples.
for _, userID := range userIDs {
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, &logproto.PushRequest{
Streams: testData[userID],
})
require.NoError(t, err)
}
require.NoError(t, it.Error())
return stream
return userIDs, testData
}
func buildTestStreams(numSeries int, linesPerSeries int, offset int) []*logproto.Stream {
m := make([]*logproto.Stream, 0, numSeries)
func buildTestStreams(offset int) []*logproto.Stream {
var m []*logproto.Stream
for i := 0; i < numSeries; i++ {
ss := logproto.Stream{
Labels: model.Metric{
"name": model.LabelValue(fmt.Sprintf("testmetric_%d", i)),
model.JobLabel: "testjob",
}.String(),
Entries: make([]logproto.Entry, 0, linesPerSeries),
}
for j := 0; j < linesPerSeries; j++ {
for j := 0; j < samplesPerSeries; j++ {
ss.Entries = append(ss.Entries, logproto.Entry{
Timestamp: time.Unix(int64(i+j+offset), 0),
Line: "line",
......@@ -145,39 +146,36 @@ func buildTestStreams(numSeries int, linesPerSeries int, offset int) []*logproto
return m
}
func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries int) ([]string, map[string][]*logproto.Stream) {
userIDs := []string{"1", "2", "3"}
// Create test samples.
testData := map[string][]*logproto.Stream{}
for i, userID := range userIDs {
testData[userID] = buildTestStreams(numSeries, samplesPerSeries, i)
}
// Append samples.
// check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, userID := range userIDs {
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, &logproto.PushRequest{
Streams: testData[userID],
chunks := s.chunks[userID]
streams := make([]*logproto.Stream, 0, len(chunks))
for _, chunk := range chunks {
lokiChunk := chunk.Data.(*chunkenc.Facade).LokiChunk()
delete(chunk.Metric, nameLabel)
labels := chunk.Metric.String()
streams = append(streams, buildStreamsFromChunk(t, labels, lokiChunk))
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
require.NoError(t, err)
require.Equal(t, testData[userID], streams)
}
return userIDs, testData
}
func TestChunkFlushingIdle(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.FlushCheckPeriod = 20 * time.Millisecond
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg)
userIDs, testData := pushTestSamples(t, ing, 4, 100)
// wait beyond idle time so samples flush
time.Sleep(cfg.MaxChunkIdle * 2)
store.checkData(t, userIDs, testData)
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD)
require.NoError(t, err)
stream := &logproto.Stream{
Labels: labels,
}
for it.Next() {
stream.Entries = append(stream.Entries, it.Entry())
}
require.NoError(t, it.Error())
return stream
}
......@@ -71,10 +71,6 @@ type ChunkStore interface {
// New makes a new Ingester.
func New(cfg Config, store ChunkStore) (*Ingester, error) {
if cfg.MaxChunkIdle == 0 {
cfg.MaxChunkIdle = 30 * time.Minute
}
i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
......
......@@ -85,7 +85,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if err := s.chunks[len(s.chunks)-1].chunk.Append(&entries[i]); err != nil {
appendErr = err
}
s.chunks[len(s.chunks)-1].lastUpdated = time.Now()
s.chunks[len(s.chunks)-1].lastUpdated = entries[i].Timestamp
}
return appendErr
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment