From 42df286b7eebc6cffcbcc4593e3aa2ec81d6c9ae Mon Sep 17 00:00:00 2001 From: Tom Wilkie <tom.wilkie@gmail.com> Date: Wed, 6 Feb 2019 19:48:43 +0000 Subject: [PATCH] Factor out mostCommon function, test it, fix it. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> --- pkg/iter/iterator.go | 44 ++++++++++++++++++++++++--------------- pkg/iter/iterator_test.go | 29 +++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 2bdc20f4..61501b53 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -134,6 +134,11 @@ func (i *heapIterator) requeue(ei EntryIterator, advanced bool) { helpers.LogError("closing iterator", ei.Close) } +type tuple struct { + logproto.Entry + EntryIterator +} + func (i *heapIterator) Next() bool { if i.heap.Len() == 0 { return false @@ -143,10 +148,7 @@ func (i *heapIterator) Next() bool { // preserve their original order. We look at all the top entries in the // heap with the same timestamp, and pop the ones whose common value // occurs most often. - type tuple struct { - logproto.Entry - EntryIterator - } + tuples := make([]tuple, 0, i.heap.Len()) for i.heap.Len() > 0 { next := i.heap.Peek() @@ -164,30 +166,38 @@ func (i *heapIterator) Next() bool { // Find in entry which occurs most often which, due to quorum based // replication, is guaranteed to be the correct next entry. + i.currEntry = mostCommon(tuples).Entry + + // Requeue the iterators, only advancing them if they were not the + // correct pick. + for j := range tuples { + i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line) + } + + return true +} + +func mostCommon(tuples []tuple) tuple { sort.Slice(tuples, func(i, j int) bool { return tuples[i].Line < tuples[j].Line }) - i.currEntry = tuples[0].Entry - count, max := 1, 1 - for j := 1; j < len(tuples); j++ { - if tuples[j].Equal(tuples[j-1]) { + result := tuples[0] + count, max := 0, 0 + for i := 0; i < len(tuples)-1; i++ { + if tuples[i].Equal(tuples[i+1].Entry) { count++ continue } if count > max { - i.currEntry = tuples[j-1].Entry + result = tuples[i] max = count } - count++ + count = 0 } - - // Requeue the iterators, only advancing them if they were not the - // correct pick. - for j := range tuples { - i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line) + if count > max { + result = tuples[len(tuples)-1] } - - return true + return result } func (i *heapIterator) Entry() logproto.Entry { diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 75596955..5594361a 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -5,11 +5,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/assert" ) -const testSize = 100 +const testSize = 10 func TestIterator(t *testing.T) { for i, tc := range []struct { @@ -116,3 +118,28 @@ func inverse(g generator) generator { return g(-i) } } + +func TestMostCommont(t *testing.T) { + // First is most common. + tuples := []tuple{ + {Entry: logproto.Entry{Line: "a"}}, + {Entry: logproto.Entry{Line: "b"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "a"}}, + {Entry: logproto.Entry{Line: "b"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "a"}}, + } + require.Equal(t, "a", mostCommon(tuples).Entry.Line) + + // Last is most common + tuples = []tuple{ + {Entry: logproto.Entry{Line: "a"}}, + {Entry: logproto.Entry{Line: "b"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "b"}}, + {Entry: logproto.Entry{Line: "c"}}, + {Entry: logproto.Entry{Line: "c"}}, + } + require.Equal(t, "c", mostCommon(tuples).Entry.Line) +} -- GitLab