diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index df35c0c8bb03bfb4b6d3080c0e85d30615267e45..d506cd40ebae59f73b864714a9832a383eaefcfc 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -119,6 +119,10 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIter result.requeue(i, false) } + if len(is) > 0 { + result.currLabels = is[0].Labels() + } + return result } diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 5594361af81c18d9ea05c4335fdb3f20084c4bf6..d42b6e1c471e478b346ced3d8b55a7f5b106f841 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -12,64 +12,80 @@ import ( ) const testSize = 10 +const defaultLabels = "{foo: \"baz\"}" func TestIterator(t *testing.T) { for i, tc := range []struct { iterator EntryIterator generator generator length int64 + labels string }{ // Test basic identity. { - iterator: mkStreamIterator(testSize, identity), + iterator: mkStreamIterator(testSize, identity, defaultLabels), generator: identity, length: testSize, + labels: defaultLabels, }, // Test basic identity (backwards). { - iterator: mkStreamIterator(testSize, inverse(identity)), + iterator: mkStreamIterator(testSize, inverse(identity), defaultLabels), generator: inverse(identity), length: testSize, + labels: defaultLabels, }, // Test dedupe of overlapping iterators with the heap iterator. { iterator: NewHeapIterator([]EntryIterator{ - mkStreamIterator(testSize, offset(0, identity)), - mkStreamIterator(testSize, offset(testSize/2, identity)), - mkStreamIterator(testSize, offset(testSize, identity)), + mkStreamIterator(testSize, offset(0, identity), defaultLabels), + mkStreamIterator(testSize, offset(testSize/2, identity), defaultLabels), + mkStreamIterator(testSize, offset(testSize, identity), defaultLabels), }, logproto.FORWARD), generator: identity, length: 2 * testSize, + labels: defaultLabels, }, // Test dedupe of overlapping iterators with the heap iterator (backward). { iterator: NewHeapIterator([]EntryIterator{ - mkStreamIterator(testSize, inverse(offset(0, identity))), - mkStreamIterator(testSize, inverse(offset(-testSize/2, identity))), - mkStreamIterator(testSize, inverse(offset(-testSize, identity))), + mkStreamIterator(testSize, inverse(offset(0, identity)), defaultLabels), + mkStreamIterator(testSize, inverse(offset(-testSize/2, identity)), defaultLabels), + mkStreamIterator(testSize, inverse(offset(-testSize, identity)), defaultLabels), }, logproto.BACKWARD), generator: inverse(identity), length: 2 * testSize, + labels: defaultLabels, }, // Test dedupe of entries with the same timestamp but different entries. { iterator: NewHeapIterator([]EntryIterator{ - mkStreamIterator(testSize, offset(0, constant(0))), - mkStreamIterator(testSize, offset(0, constant(0))), - mkStreamIterator(testSize, offset(testSize, constant(0))), + mkStreamIterator(testSize, offset(0, constant(0)), defaultLabels), + mkStreamIterator(testSize, offset(0, constant(0)), defaultLabels), + mkStreamIterator(testSize, offset(testSize, constant(0)), defaultLabels), }, logproto.FORWARD), generator: constant(0), length: 2 * testSize, + labels: defaultLabels, + }, + + // Test basic identity with non-default labels. + { + iterator: mkStreamIterator(testSize, identity, "{foobar: \"bazbar\"}"), + generator: identity, + length: testSize, + labels: "{foobar: \"bazbar\"}", }, } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { for i := int64(0); i < tc.length; i++ { assert.Equal(t, true, tc.iterator.Next()) assert.Equal(t, tc.generator(i), tc.iterator.Entry(), fmt.Sprintln("iteration", i)) + assert.Equal(t, tc.labels, tc.iterator.Labels(), fmt.Sprintln("iteration", i)) } assert.Equal(t, false, tc.iterator.Next()) @@ -81,13 +97,14 @@ func TestIterator(t *testing.T) { type generator func(i int64) logproto.Entry -func mkStreamIterator(numEntries int64, f generator) EntryIterator { +func mkStreamIterator(numEntries int64, f generator, labels string) EntryIterator { entries := []logproto.Entry{} for i := int64(0); i < numEntries; i++ { entries = append(entries, f(i)) } return newStreamIterator(&logproto.Stream{ Entries: entries, + Labels: labels, }) }