From db36b3c74ebb074ed03c91f4ed2ef0ac13d48909 Mon Sep 17 00:00:00 2001
From: Tom Wilkie <tom.wilkie@gmail.com>
Date: Tue, 5 Feb 2019 17:00:26 +0000
Subject: [PATCH] Update heap iterator to allow for entries with duplicate
 timestamps.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
---
 pkg/iter/iterator.go      | 69 ++++++++++++++++++++++++++++-----------
 pkg/iter/iterator_test.go | 35 +++++++++++++++-----
 2 files changed, 76 insertions(+), 28 deletions(-)

diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go
index fac7364a..2bdc20f4 100644
--- a/pkg/iter/iterator.go
+++ b/pkg/iter/iterator.go
@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"regexp"
+	"sort"
 	"time"
 
 	"github.com/grafana/loki/pkg/helpers"
@@ -95,8 +96,9 @@ type heapIterator struct {
 		heap.Interface
 		Peek() EntryIterator
 	}
-	curr EntryIterator
-	errs []error
+	currEntry  logproto.Entry
+	currLabels string
+	errs       []error
 }
 
 // NewHeapIterator returns a new iterator which uses a heap to merge together
@@ -114,14 +116,14 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIter
 
 	// pre-next each iterator, drop empty.
 	for _, i := range is {
-		result.requeue(i)
+		result.requeue(i, false)
 	}
 
 	return result
 }
 
-func (i *heapIterator) requeue(ei EntryIterator) {
-	if ei.Next() {
+func (i *heapIterator) requeue(ei EntryIterator, advanced bool) {
+	if advanced || ei.Next() {
 		heap.Push(i.heap, ei)
 		return
 	}
@@ -133,38 +135,67 @@ func (i *heapIterator) requeue(ei EntryIterator) {
 }
 
 func (i *heapIterator) Next() bool {
-	if i.curr != nil {
-		i.requeue(i.curr)
-	}
-
 	if i.heap.Len() == 0 {
 		return false
 	}
 
-	i.curr = heap.Pop(i.heap).(EntryIterator)
-	currEntry := i.curr.Entry()
-
-	// keep popping entries off if they match, to dedupe
+	// We support multiple entries with the same timestamp, and we want to
+	// preserve their original order. We look at all the top entries in the
+	// heap with the same timestamp, and pop the ones whose common value
+	// occurs most often.
+	type tuple struct {
+		logproto.Entry
+		EntryIterator
+	}
+	tuples := make([]tuple, 0, i.heap.Len())
 	for i.heap.Len() > 0 {
 		next := i.heap.Peek()
-		nextEntry := next.Entry()
-		if !currEntry.Equal(nextEntry) {
+		entry := next.Entry()
+		if len(tuples) > 0 && !tuples[0].Timestamp.Equal(entry.Timestamp) {
 			break
 		}
 
-		next = heap.Pop(i.heap).(EntryIterator)
-		i.requeue(next)
+		heap.Pop(i.heap)
+		tuples = append(tuples, tuple{
+			Entry:         entry,
+			EntryIterator: next,
+		})
+	}
+
+	// Find in entry which occurs most often which, due to quorum based
+	// replication, is guaranteed to be the correct next entry.
+	sort.Slice(tuples, func(i, j int) bool {
+		return tuples[i].Line < tuples[j].Line
+	})
+	i.currEntry = tuples[0].Entry
+	count, max := 1, 1
+	for j := 1; j < len(tuples); j++ {
+		if tuples[j].Equal(tuples[j-1]) {
+			count++
+			continue
+		}
+		if count > max {
+			i.currEntry = tuples[j-1].Entry
+			max = count
+		}
+		count++
+	}
+
+	// Requeue the iterators, only advancing them if they were not the
+	// correct pick.
+	for j := range tuples {
+		i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line)
 	}
 
 	return true
 }
 
 func (i *heapIterator) Entry() logproto.Entry {
-	return i.curr.Entry()
+	return i.currEntry
 }
 
 func (i *heapIterator) Labels() string {
-	return i.curr.Labels()
+	return i.currLabels
 }
 
 func (i *heapIterator) Error() error {
diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go
index d2250b84..75596955 100644
--- a/pkg/iter/iterator_test.go
+++ b/pkg/iter/iterator_test.go
@@ -34,9 +34,9 @@ func TestIterator(t *testing.T) {
 		// Test dedupe of overlapping iterators with the heap iterator.
 		{
 			iterator: NewHeapIterator([]EntryIterator{
-				mkStreamIterator(testSize, offset(0)),
-				mkStreamIterator(testSize, offset(testSize/2)),
-				mkStreamIterator(testSize, offset(testSize)),
+				mkStreamIterator(testSize, offset(0, identity)),
+				mkStreamIterator(testSize, offset(testSize/2, identity)),
+				mkStreamIterator(testSize, offset(testSize, identity)),
 			}, logproto.FORWARD),
 			generator: identity,
 			length:    2 * testSize,
@@ -45,13 +45,24 @@ func TestIterator(t *testing.T) {
 		// Test dedupe of overlapping iterators with the heap iterator (backward).
 		{
 			iterator: NewHeapIterator([]EntryIterator{
-				mkStreamIterator(testSize, inverse(offset(0))),
-				mkStreamIterator(testSize, inverse(offset(-testSize/2))),
-				mkStreamIterator(testSize, inverse(offset(-testSize))),
+				mkStreamIterator(testSize, inverse(offset(0, identity))),
+				mkStreamIterator(testSize, inverse(offset(-testSize/2, identity))),
+				mkStreamIterator(testSize, inverse(offset(-testSize, identity))),
 			}, logproto.BACKWARD),
 			generator: inverse(identity),
 			length:    2 * testSize,
 		},
+
+		// Test dedupe of entries with the same timestamp but different entries.
+		{
+			iterator: NewHeapIterator([]EntryIterator{
+				mkStreamIterator(testSize, offset(0, constant(0))),
+				mkStreamIterator(testSize, offset(0, constant(0))),
+				mkStreamIterator(testSize, offset(testSize, constant(0))),
+			}, logproto.FORWARD),
+			generator: constant(0),
+			length:    2 * testSize,
+		},
 	} {
 		t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
 			for i := int64(0); i < tc.length; i++ {
@@ -85,11 +96,17 @@ func identity(i int64) logproto.Entry {
 	}
 }
 
-func offset(j int64) generator {
+func offset(j int64, g generator) generator {
+	return func(i int64) logproto.Entry {
+		return g(i + j)
+	}
+}
+
+func constant(t int64) generator {
 	return func(i int64) logproto.Entry {
 		return logproto.Entry{
-			Timestamp: time.Unix(i+j, 0),
-			Line:      fmt.Sprintf("%d", i+j),
+			Timestamp: time.Unix(t, 0),
+			Line:      fmt.Sprintf("%d", i),
 		}
 	}
 }
-- 
GitLab