From 4d489729394ec77fdde3fd2ca2170e06a9c25313 Mon Sep 17 00:00:00 2001
From: Cyril Tovena <cyril.tovena@gmail.com>
Date: Mon, 17 Jun 2019 21:32:11 -0400
Subject: [PATCH] Query storage by iterating through chunks by batches. Also
 adds more tests for the store

---
 pkg/chunkenc/lazy_chunk.go   |  83 +-------
 pkg/ingester/instance.go     |   3 +-
 pkg/iter/iterator.go         |  25 +++
 pkg/loki/loki.go             |   5 +-
 pkg/loki/modules.go          |   4 +-
 pkg/querier/querier.go       |  27 +--
 pkg/storage/hack/main.go     |   8 +-
 pkg/storage/iterator.go      | 385 +++++++++++++++++++++++++++++++++++
 pkg/storage/iterator_test.go | 309 ++++++++++++++++++++++++++++
 pkg/storage/store.go         | 218 +++-----------------
 pkg/storage/store_test.go    | 205 ++++++++++++++++++-
 pkg/storage/util_test.go     | 245 ++++++++++++++++++++++
 12 files changed, 1198 insertions(+), 319 deletions(-)
 create mode 100644 pkg/storage/iterator.go
 create mode 100644 pkg/storage/iterator_test.go
 create mode 100644 pkg/storage/util_test.go

diff --git a/pkg/chunkenc/lazy_chunk.go b/pkg/chunkenc/lazy_chunk.go
index cf710bfe..716cc6ea 100644
--- a/pkg/chunkenc/lazy_chunk.go
+++ b/pkg/chunkenc/lazy_chunk.go
@@ -2,6 +2,7 @@ package chunkenc
 
 import (
 	"context"
+	"errors"
 	"time"
 
 	"github.com/cortexproject/cortex/pkg/chunk"
@@ -16,14 +17,6 @@ type LazyChunk struct {
 	Fetcher *chunk.Fetcher
 }
 
-func (c *LazyChunk) getChunk(ctx context.Context) (Chunk, error) {
-	chunks, err := c.Fetcher.FetchChunks(ctx, []chunk.Chunk{c.Chunk}, []string{c.Chunk.ExternalKey()})
-	if err != nil {
-		return nil, err
-	}
-	return chunks[0].Data.(*Facade).LokiChunk(), nil
-}
-
 // Iterator returns an entry iterator.
 func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
 	// If the chunk is already loaded, then use that.
@@ -32,77 +25,5 @@ func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direc
 		return lokiChunk.Iterator(from, through, direction, filter)
 	}
 
-	return &lazyIterator{
-		chunk:  c,
-		filter: filter,
-
-		from:      from,
-		through:   through,
-		direction: direction,
-		context:   ctx,
-	}, nil
-}
-
-type lazyIterator struct {
-	iter.EntryIterator
-
-	chunk *LazyChunk
-	err   error
-
-	from, through time.Time
-	direction     logproto.Direction
-	context       context.Context
-	filter        logql.Filter
-
-	closed bool
-}
-
-func (it *lazyIterator) Next() bool {
-	if it.err != nil {
-		return false
-	}
-
-	if it.closed {
-		return false
-	}
-
-	if it.EntryIterator != nil {
-		next := it.EntryIterator.Next()
-		if !next {
-			it.Close()
-		}
-		return next
-	}
-
-	chk, err := it.chunk.getChunk(it.context)
-	if err != nil {
-		it.err = err
-		return false
-	}
-	it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction, it.filter)
-	return it.Next()
-}
-
-func (it *lazyIterator) Labels() string {
-	return it.chunk.Chunk.Metric.String()
-}
-
-func (it *lazyIterator) Error() error {
-	if it.err != nil {
-		return it.err
-	}
-	if it.EntryIterator != nil {
-		return it.EntryIterator.Error()
-	}
-	return nil
-}
-
-func (it *lazyIterator) Close() error {
-	if it.EntryIterator != nil {
-		it.closed = true
-		err := it.EntryIterator.Close()
-		it.EntryIterator = nil
-		return err
-	}
-	return nil
+	return nil, errors.New("chunk is not loaded")
 }
diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go
index 2cd75c10..2b5b6882 100644
--- a/pkg/ingester/instance.go
+++ b/pkg/ingester/instance.go
@@ -18,7 +18,6 @@ import (
 	"github.com/grafana/loki/pkg/iter"
 	"github.com/grafana/loki/pkg/logproto"
 	"github.com/grafana/loki/pkg/logql"
-	"github.com/grafana/loki/pkg/querier"
 	"github.com/grafana/loki/pkg/util"
 )
 
@@ -257,7 +256,7 @@ func isDone(ctx context.Context) bool {
 func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
 	sent := uint32(0)
 	for sent < limit && !isDone(queryServer.Context()) {
-		batch, batchSize, err := querier.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
+		batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
 		if err != nil {
 			return err
 		}
diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go
index dfe3287a..88f93c9d 100644
--- a/pkg/iter/iterator.go
+++ b/pkg/iter/iterator.go
@@ -466,3 +466,28 @@ func (i *entryIteratorBackward) Error() error { return nil }
 func (i *entryIteratorBackward) Labels() string {
 	return ""
 }
+
+// ReadBatch reads a set of entries off an iterator.
+func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
+	streams := map[string]*logproto.Stream{}
+	respSize := uint32(0)
+	for ; respSize < size && i.Next(); respSize++ {
+		labels, entry := i.Labels(), i.Entry()
+		stream, ok := streams[labels]
+		if !ok {
+			stream = &logproto.Stream{
+				Labels: labels,
+			}
+			streams[labels] = stream
+		}
+		stream.Entries = append(stream.Entries, entry)
+	}
+
+	result := logproto.QueryResponse{
+		Streams: make([]*logproto.Stream, 0, len(streams)),
+	}
+	for _, stream := range streams {
+		result.Streams = append(result.Streams, stream)
+	}
+	return &result, respSize, i.Error()
+}
diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go
index 0622749f..b87ec002 100644
--- a/pkg/loki/loki.go
+++ b/pkg/loki/loki.go
@@ -9,7 +9,6 @@ import (
 	"google.golang.org/grpc"
 
 	"github.com/cortexproject/cortex/pkg/chunk"
-	"github.com/cortexproject/cortex/pkg/chunk/storage"
 	"github.com/cortexproject/cortex/pkg/ring"
 	"github.com/cortexproject/cortex/pkg/util"
 	"github.com/cortexproject/cortex/pkg/util/validation"
@@ -20,7 +19,7 @@ import (
 	"github.com/grafana/loki/pkg/ingester"
 	"github.com/grafana/loki/pkg/ingester/client"
 	"github.com/grafana/loki/pkg/querier"
-	loki_storage "github.com/grafana/loki/pkg/storage"
+	"github.com/grafana/loki/pkg/storage"
 )
 
 // Config is the root config for Loki.
@@ -70,7 +69,7 @@ type Loki struct {
 	distributor  *distributor.Distributor
 	ingester     *ingester.Ingester
 	querier      *querier.Querier
-	store        loki_storage.Store
+	store        storage.Store
 	tableManager *chunk.TableManager
 
 	httpAuthMiddleware middleware.Interface
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index 721382e1..2a4d32be 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -197,12 +197,12 @@ func (t *Loki) initTableManager() error {
 		os.Exit(1)
 	}
 
-	tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig)
+	tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig.Config)
 	if err != nil {
 		return err
 	}
 
-	bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig)
+	bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig.Config)
 	util.CheckFatal("initializing bucket client", err)
 
 	t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient)
diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go
index 804425f8..fc9a38e4 100644
--- a/pkg/querier/querier.go
+++ b/pkg/querier/querier.go
@@ -129,7 +129,7 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr
 	iterator := iter.NewHeapIterator(iterators, req.Direction)
 	defer helpers.LogError("closing iterator", iterator.Close)
 
-	resp, _, err := ReadBatch(iterator, req.Limit)
+	resp, _, err := iter.ReadBatch(iterator, req.Limit)
 	return resp, err
 }
 
@@ -201,31 +201,6 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
 	}, nil
 }
 
-// ReadBatch reads a set of entries off an iterator.
-func ReadBatch(i iter.EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
-	streams := map[string]*logproto.Stream{}
-	respSize := uint32(0)
-	for ; respSize < size && i.Next(); respSize++ {
-		labels, entry := i.Labels(), i.Entry()
-		stream, ok := streams[labels]
-		if !ok {
-			stream = &logproto.Stream{
-				Labels: labels,
-			}
-			streams[labels] = stream
-		}
-		stream.Entries = append(stream.Entries, entry)
-	}
-
-	result := logproto.QueryResponse{
-		Streams: make([]*logproto.Stream, 0, len(streams)),
-	}
-	for _, stream := range streams {
-		result.Streams = append(result.Streams, stream)
-	}
-	return &result, respSize, i.Error()
-}
-
 // Check implements the grpc healthcheck
 func (*Querier) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
 	return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go
index c52f573a..00d1f73c 100644
--- a/pkg/storage/hack/main.go
+++ b/pkg/storage/hack/main.go
@@ -40,9 +40,11 @@ func main() {
 
 func getStore() (lstore.Store, error) {
 	store, err := lstore.NewStore(
-		storage.Config{
-			BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
-			FSConfig:     local.FSConfig{Directory: "/tmp/benchmark/chunks"},
+		lstore.Config{
+			Config: storage.Config{
+				BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
+				FSConfig:     local.FSConfig{Directory: "/tmp/benchmark/chunks"},
+			},
 		},
 		chunk.StoreConfig{},
 		chunk.SchemaConfig{
diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go
new file mode 100644
index 00000000..7dee5a81
--- /dev/null
+++ b/pkg/storage/iterator.go
@@ -0,0 +1,385 @@
+package storage
+
+import (
+	"context"
+	"sort"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/chunk"
+	"github.com/cortexproject/cortex/pkg/util/spanlogger"
+	"github.com/go-kit/kit/log/level"
+	"github.com/grafana/loki/pkg/chunkenc"
+	"github.com/grafana/loki/pkg/iter"
+	"github.com/grafana/loki/pkg/logproto"
+	"github.com/grafana/loki/pkg/logql"
+	"github.com/prometheus/common/model"
+	"github.com/prometheus/prometheus/pkg/labels"
+)
+
+// lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries
+// in ascending or descending depending on the direction
+type lazyChunks struct {
+	chunks    []*chunkenc.LazyChunk
+	direction logproto.Direction
+}
+
+func (l lazyChunks) Len() int                  { return len(l.chunks) }
+func (l lazyChunks) Swap(i, j int)             { l.chunks[i], l.chunks[j] = l.chunks[j], l.chunks[i] }
+func (l lazyChunks) Peek() *chunkenc.LazyChunk { return l.chunks[0] }
+func (l lazyChunks) Less(i, j int) bool {
+	if l.direction == logproto.FORWARD {
+		t1, t2 := l.chunks[i].Chunk.From, l.chunks[j].Chunk.From
+		if !t1.Equal(t2) {
+			return t1.Before(t2)
+		}
+		return l.chunks[i].Chunk.Fingerprint < l.chunks[j].Chunk.Fingerprint
+	}
+	t1, t2 := l.chunks[i].Chunk.Through, l.chunks[j].Chunk.Through
+	if !t1.Equal(t2) {
+		return t1.After(t2)
+	}
+	return l.chunks[i].Chunk.Fingerprint > l.chunks[j].Chunk.Fingerprint
+}
+
+// pop returns the top `count` lazychunks, the original slice is splitted an copied
+// to avoid retaining chunks in the slice backing array.
+func (l *lazyChunks) pop(count int) []*chunkenc.LazyChunk {
+	if len(l.chunks) <= count {
+		old := l.chunks
+		l.chunks = nil
+		return old
+	}
+	// split slices into two new ones and copy parts to each so we don't keep old reference
+	res := make([]*chunkenc.LazyChunk, count)
+	copy(res, l.chunks[0:count])
+	new := make([]*chunkenc.LazyChunk, len(l.chunks)-count)
+	copy(new, l.chunks[count:len(l.chunks)])
+	l.chunks = new
+	return res
+}
+
+// batchChunkIterator is an EntryIterator that iterates through chunks by batch of `batchSize`.
+// Since chunks can overlap across batches for each iteration the iterator will keep all overlapping
+// chunks with the next chunk from the next batch and added it to the next iteration. In this case the boundaries of the batch
+// is reduced to non-overlapping chunks boundaries.
+type batchChunkIterator struct {
+	chunks          lazyChunks
+	batchSize       int
+	err             error
+	curr            iter.EntryIterator
+	lastOverlapping []*chunkenc.LazyChunk
+
+	ctx      context.Context
+	matchers []*labels.Matcher
+	filter   logql.Filter
+	req      *logproto.QueryRequest
+}
+
+// newBatchChunkIterator creates a new batch iterator with the given batchSize.
+func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator {
+	res := &batchChunkIterator{
+		batchSize: batchSize,
+		matchers:  matchers,
+		filter:    filter,
+		req:       req,
+		ctx:       ctx,
+		chunks:    lazyChunks{direction: req.Direction, chunks: chunks},
+	}
+	sort.Sort(res.chunks)
+	return res
+}
+
+func (it *batchChunkIterator) Next() bool {
+	var err error
+	// for loop to avoid recursion
+	for {
+		if it.curr != nil && it.curr.Next() {
+			return true
+		}
+		if it.chunks.Len() == 0 {
+			return false
+		}
+		// close previous iterator
+		if it.curr != nil {
+			it.err = it.curr.Close()
+		}
+		it.curr, err = it.nextBatch()
+		if err != nil {
+			it.err = err
+			return false
+		}
+	}
+}
+
+func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
+	// pop the next batch of chunks and append/preprend previous overlapping chunks
+	// so we can merge/de-dupe overlapping entries.
+	batch := make([]*chunkenc.LazyChunk, 0, it.batchSize+len(it.lastOverlapping))
+	if it.req.Direction == logproto.FORWARD {
+		batch = append(batch, it.lastOverlapping...)
+	}
+	batch = append(batch, it.chunks.pop(it.batchSize)...)
+	if it.req.Direction == logproto.BACKWARD {
+		batch = append(batch, it.lastOverlapping...)
+	}
+
+	from, through := it.req.Start, it.req.End
+	if it.chunks.Len() > 0 {
+		nextChunk := it.chunks.Peek()
+		// we max out our iterator boundaries to the next chunks in the queue
+		// so that overlapping chunks together
+		if it.req.Direction == logproto.BACKWARD {
+			from = time.Unix(0, nextChunk.Chunk.Through.UnixNano())
+		} else {
+			through = time.Unix(0, nextChunk.Chunk.From.UnixNano())
+		}
+		// we save all overlapping chunks as they are also needed in the next batch to properly order entries.
+		it.lastOverlapping = []*chunkenc.LazyChunk{}
+		for _, c := range batch {
+			if it.req.Direction == logproto.BACKWARD {
+				if c.Chunk.From.Before(nextChunk.Chunk.Through) || c.Chunk.From == nextChunk.Chunk.Through {
+					it.lastOverlapping = append(it.lastOverlapping, c)
+				}
+			} else {
+				if !c.Chunk.Through.Before(nextChunk.Chunk.From) {
+					it.lastOverlapping = append(it.lastOverlapping, c)
+				}
+			}
+		}
+	} else {
+		if len(it.lastOverlapping) > 0 {
+			if it.req.Direction == logproto.BACKWARD {
+				through = time.Unix(0, it.lastOverlapping[0].Chunk.From.UnixNano())
+			} else {
+				from = time.Unix(0, it.lastOverlapping[0].Chunk.Through.UnixNano())
+			}
+		}
+	}
+
+	// create the new chunks iterator from the current batch.
+	return newChunksIterator(it.ctx, batch, it.matchers, it.filter, it.req.Direction, from, through)
+}
+
+func (it *batchChunkIterator) Entry() logproto.Entry {
+	return it.curr.Entry()
+}
+
+func (it *batchChunkIterator) Labels() string {
+	return it.curr.Labels()
+}
+
+func (it *batchChunkIterator) Error() error {
+	if it.err != nil {
+		return it.err
+	}
+	if it.curr != nil {
+		return it.curr.Error()
+	}
+	return nil
+}
+
+func (it *batchChunkIterator) Close() error {
+	if it.curr != nil {
+		return it.curr.Close()
+	}
+	return nil
+}
+
+// newChunksIterator creates an iterator over a set of lazychunks.
+func newChunksIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, matchers []*labels.Matcher, filter logql.Filter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) {
+	chksBySeries := partitionBySeriesChunks(chunks)
+
+	// Make sure the initial chunks are loaded. This is not one chunk
+	// per series, but rather a chunk per non-overlapping iterator.
+	if err := loadFirstChunks(ctx, chksBySeries); err != nil {
+		return nil, err
+	}
+
+	// Now that we have the first chunk for each series loaded,
+	// we can proceed to filter the series that don't match.
+	chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
+
+	var allChunks []*chunkenc.LazyChunk
+	for _, series := range chksBySeries {
+		for _, chunks := range series {
+			allChunks = append(allChunks, chunks...)
+		}
+	}
+
+	// load all chunks not already loaded
+	if err := fetchLazyChunks(ctx, allChunks); err != nil {
+		return nil, err
+	}
+
+	iters, err := buildIterators(ctx, chksBySeries, filter, direction, from, through)
+	if err != nil {
+		return nil, err
+	}
+
+	return iter.NewHeapIterator(iters, direction), nil
+}
+
+func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) ([]iter.EntryIterator, error) {
+	result := make([]iter.EntryIterator, 0, len(chks))
+	for _, chunks := range chks {
+		iterator, err := buildHeapIterator(ctx, chunks, filter, direction, from, through)
+		if err != nil {
+			return nil, err
+		}
+		result = append(result, iterator)
+	}
+
+	return result, nil
+}
+
+func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) {
+	result := make([]iter.EntryIterator, 0, len(chks))
+	if chks[0][0].Chunk.Metric.Has("__name__") {
+		labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric)
+		labelsBuilder.Del("__name__")
+		chks[0][0].Chunk.Metric = labelsBuilder.Labels()
+	}
+	labels := chks[0][0].Chunk.Metric.String()
+
+	for i := range chks {
+		iterators := make([]iter.EntryIterator, 0, len(chks[i]))
+		for j := range chks[i] {
+			iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter)
+			if err != nil {
+				return nil, err
+			}
+			iterators = append(iterators, iterator)
+		}
+		if direction == logproto.BACKWARD {
+			for i, j := 0, len(iterators)-1; i < j; i, j = i+1, j-1 {
+				iterators[i], iterators[j] = iterators[j], iterators[i]
+			}
+		}
+		result = append(result, iter.NewNonOverlappingIterator(iterators, labels))
+	}
+
+	return iter.NewHeapIterator(result, direction), nil
+}
+
+func filterSeriesByMatchers(chks map[model.Fingerprint][][]*chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*chunkenc.LazyChunk {
+outer:
+	for fp, chunks := range chks {
+		for _, matcher := range matchers {
+			if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {
+				delete(chks, fp)
+				continue outer
+			}
+		}
+	}
+	return chks
+}
+
+func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
+	log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks")
+	defer log.Finish()
+
+	var totalChunks int
+	chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}
+	for _, c := range chunks {
+		if c.Chunk.Data == nil {
+			chksByFetcher[c.Fetcher] = append(chksByFetcher[c.Fetcher], c)
+			totalChunks++
+		}
+	}
+	if len(chksByFetcher) == 0 {
+		return nil
+	}
+	level.Debug(log).Log("msg", "loading lazy chunks", "chunks", totalChunks)
+
+	errChan := make(chan error)
+	for fetcher, chunks := range chksByFetcher {
+		go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {
+
+			keys := make([]string, 0, len(chunks))
+			chks := make([]chunk.Chunk, 0, len(chunks))
+			index := make(map[string]*chunkenc.LazyChunk, len(chunks))
+
+			sort.Slice(chunks, func(i, j int) bool { return chunks[i].Chunk.ExternalKey() < chunks[j].Chunk.ExternalKey() })
+			for _, chk := range chunks {
+				key := chk.Chunk.ExternalKey()
+				keys = append(keys, key)
+				chks = append(chks, chk.Chunk)
+				index[key] = chk
+			}
+			chks, err := fetcher.FetchChunks(ctx, chks, keys)
+			if err != nil {
+				errChan <- err
+				return
+			}
+			// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
+			for _, chk := range chks {
+				index[chk.ExternalKey()].Chunk = chk
+			}
+
+			errChan <- nil
+		}(fetcher, chunks)
+	}
+
+	var lastErr error
+	for i := 0; i < len(chksByFetcher); i++ {
+		if err := <-errChan; err != nil {
+			lastErr = err
+		}
+	}
+
+	return lastErr
+}
+
+func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error {
+	var toLoad []*chunkenc.LazyChunk
+	for _, lchks := range chks {
+		for _, lchk := range lchks {
+			if len(lchk) == 0 {
+				continue
+			}
+			toLoad = append(toLoad, lchk[0])
+		}
+	}
+	return fetchLazyChunks(ctx, toLoad)
+}
+
+func partitionBySeriesChunks(chunks []*chunkenc.LazyChunk) map[model.Fingerprint][][]*chunkenc.LazyChunk {
+	chunksByFp := map[model.Fingerprint][]*chunkenc.LazyChunk{}
+	for _, c := range chunks {
+		fp := c.Chunk.Fingerprint
+		chunksByFp[fp] = append(chunksByFp[fp], c)
+	}
+	result := make(map[model.Fingerprint][][]*chunkenc.LazyChunk, len(chunksByFp))
+
+	for fp, chks := range chunksByFp {
+		result[fp] = partitionOverlappingChunks(chks)
+	}
+
+	return result
+}
+
+// partitionOverlappingChunks splits the list of chunks into different non-overlapping lists.
+// todo this might reverse the order.
+func partitionOverlappingChunks(chunks []*chunkenc.LazyChunk) [][]*chunkenc.LazyChunk {
+	sort.Slice(chunks, func(i, j int) bool {
+		return chunks[i].Chunk.From < chunks[j].Chunk.From
+	})
+
+	css := [][]*chunkenc.LazyChunk{}
+outer:
+	for _, c := range chunks {
+		for i, cs := range css {
+			// If the chunk doesn't overlap with the current list, then add it to it.
+			if cs[len(cs)-1].Chunk.Through.Before(c.Chunk.From) {
+				css[i] = append(css[i], c)
+				continue outer
+			}
+		}
+		// If the chunk overlaps with every existing list, then create a new list.
+		cs := make([]*chunkenc.LazyChunk, 0, len(chunks)/(len(css)+1))
+		cs = append(cs, c)
+		css = append(css, cs)
+	}
+
+	return css
+}
diff --git a/pkg/storage/iterator_test.go b/pkg/storage/iterator_test.go
new file mode 100644
index 00000000..8f043481
--- /dev/null
+++ b/pkg/storage/iterator_test.go
@@ -0,0 +1,309 @@
+package storage
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/grafana/loki/pkg/chunkenc"
+	"github.com/grafana/loki/pkg/iter"
+	"github.com/grafana/loki/pkg/logproto"
+)
+
+func Test_newBatchChunkIterator(t *testing.T) {
+
+	tests := map[string]struct {
+		chunks     []*chunkenc.LazyChunk
+		expected   []*logproto.Stream
+		matchers   string
+		start, end time.Time
+		direction  logproto.Direction
+	}{
+		"forward with overlap": {
+			[]*chunkenc.LazyChunk{
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+					},
+				}),
+			},
+			[]*logproto.Stream{
+				{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				},
+			},
+			fooLabels,
+			from, from.Add(3 * time.Millisecond),
+			logproto.FORWARD,
+		},
+		"backward with overlap": {
+			[]*chunkenc.LazyChunk{
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+					},
+				}),
+			},
+			[]*logproto.Stream{
+				{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+			},
+			fooLabels,
+			from, from.Add(3 * time.Millisecond),
+			logproto.BACKWARD,
+		},
+		"forward without overlap": {
+			[]*chunkenc.LazyChunk{
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+					},
+				}),
+			},
+			[]*logproto.Stream{
+				{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				},
+			},
+			fooLabels,
+			from, from.Add(3 * time.Millisecond),
+			logproto.FORWARD,
+		},
+		"backward without overlap": {
+			[]*chunkenc.LazyChunk{
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+					},
+				}),
+				newLazyChunk(logproto.Stream{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+					},
+				}),
+			},
+			[]*logproto.Stream{
+				{
+					Labels: fooLabels,
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+			},
+			fooLabels,
+			from, from.Add(3 * time.Millisecond),
+			logproto.BACKWARD,
+		},
+	}
+
+	for name, tt := range tests {
+		tt := tt
+		t.Run(name, func(t *testing.T) {
+			it := newBatchChunkIterator(context.Background(), tt.chunks, 2, newMatchers(tt.matchers), nil, newQuery("", tt.start, tt.end, tt.direction))
+			streams, _, err := iter.ReadBatch(it, 1000)
+			_ = it.Close()
+			if err != nil {
+				t.Fatalf("error reading batch %s", err)
+			}
+
+			assertStream(t, tt.expected, streams.Streams)
+
+		})
+	}
+
+}
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 7e156637..b659bea5 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -2,7 +2,7 @@ package storage
 
 import (
 	"context"
-	"sort"
+	"flag"
 
 	"github.com/cortexproject/cortex/pkg/chunk"
 	"github.com/cortexproject/cortex/pkg/chunk/storage"
@@ -12,11 +12,22 @@ import (
 	"github.com/grafana/loki/pkg/logproto"
 	"github.com/grafana/loki/pkg/logql"
 	"github.com/grafana/loki/pkg/util"
-	"github.com/opentracing/opentracing-go"
 	"github.com/prometheus/common/model"
 	"github.com/prometheus/prometheus/pkg/labels"
 )
 
+// Config is the loki storage configuration
+type Config struct {
+	storage.Config    `yaml:",inline"`
+	MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
+}
+
+// RegisterFlags adds the flags required to configure this flag set.
+func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
+	cfg.Config.RegisterFlags(f)
+	f.IntVar(&cfg.MaxChunkBatchSize, "max-chunk-batch-size", 50, "The maximun of chunks to fetch per batch.")
+}
+
 // Store is the Loki chunk store to retrieve and save chunks.
 type Store interface {
 	chunk.Store
@@ -25,16 +36,18 @@ type Store interface {
 
 type store struct {
 	chunk.Store
+	cfg Config
 }
 
 // NewStore creates a new Loki Store using configuration supplied.
-func NewStore(cfg storage.Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (Store, error) {
-	s, err := storage.NewStore(cfg, storeCfg, schemaCfg, limits)
+func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (Store, error) {
+	s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits)
 	if err != nil {
 		return nil, err
 	}
 	return &store{
 		Store: s,
+		cfg:   cfg,
 	}, nil
 }
 
@@ -63,28 +76,18 @@ func (s *store) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter
 			return nil, err
 		}
 
+		var totalChunks int
 		for i := range chks {
 			chks[i] = filterChunksByTime(from, through, chks[i])
+			totalChunks += len(chks[i])
 		}
-
-		chksBySeries := partitionBySeriesChunks(chks, fetchers)
-
-		// Make sure the initial chunks are loaded. This is not one chunk
-		// per series, but rather a chunk per non-overlapping iterator.
-		if err := loadFirstChunks(ctx, chksBySeries, req); err != nil {
-			return nil, err
-		}
-
-		// Now that we have the first chunk for each series loaded,
-		// we can proceed to filter the series that don't match.
-		chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, req)
-
-		iters, err := buildIterators(ctx, req, chksBySeries, filter)
-		if err != nil {
-			return nil, err
+		lazyChunks := make([]*chunkenc.LazyChunk, 0, totalChunks)
+		for i := range chks {
+			for _, c := range chks[i] {
+				lazyChunks = append(lazyChunks, &chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]})
+			}
 		}
-
-		return iter.NewHeapIterator(iters, req.Direction), nil
+		return newBatchChunkIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req), nil
 	})
 
 	return expr.Eval(querier)
@@ -100,174 +103,3 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
 	}
 	return filtered
 }
-
-func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher, req *logproto.QueryRequest) map[model.Fingerprint][][]chunkenc.LazyChunk {
-outer:
-	for fp, chunks := range chks {
-		for _, matcher := range matchers {
-			// checks matchers against the last chunk if we're doing BACKWARD
-			if req.Direction == logproto.BACKWARD {
-				if !matcher.Matches(chunks[0][len(chunks[0])-1].Chunk.Metric.Get(matcher.Name)) {
-					delete(chks, fp)
-					continue outer
-				}
-			} else {
-				if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {
-					delete(chks, fp)
-					continue outer
-				}
-			}
-
-		}
-	}
-
-	return chks
-}
-
-func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk, filter logql.Filter) ([]iter.EntryIterator, error) {
-	result := make([]iter.EntryIterator, 0, len(chks))
-	for _, chunks := range chks {
-		iterator, err := buildHeapIterator(ctx, req, chunks, filter)
-		if err != nil {
-			return nil, err
-		}
-		result = append(result, iterator)
-	}
-
-	return result, nil
-}
-
-func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk, filter logql.Filter) (iter.EntryIterator, error) {
-	result := make([]iter.EntryIterator, 0, len(chks))
-	var fetchedChunkIndex int
-	if req.Direction == logproto.BACKWARD {
-		fetchedChunkIndex = len(chks[0]) - 1
-	}
-	if chks[0][fetchedChunkIndex].Chunk.Metric.Has("__name__") {
-		labelsBuilder := labels.NewBuilder(chks[0][fetchedChunkIndex].Chunk.Metric)
-		labelsBuilder.Del("__name__")
-		chks[0][fetchedChunkIndex].Chunk.Metric = labelsBuilder.Labels()
-	}
-	labels := chks[0][fetchedChunkIndex].Chunk.Metric.String()
-
-	for i := range chks {
-		iterators := make([]iter.EntryIterator, 0, len(chks[i]))
-		for j := range chks[i] {
-			iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction, filter)
-			if err != nil {
-				return nil, err
-			}
-			iterators = append(iterators, iterator)
-		}
-		// reverse chunks to start with the last one.
-		if req.Direction == logproto.BACKWARD {
-			for i, j := 0, len(iterators)-1; i < j; i, j = i+1, j-1 {
-				iterators[i], iterators[j] = iterators[j], iterators[i]
-			}
-		}
-		result = append(result, iter.NewNonOverlappingIterator(iterators, labels))
-	}
-
-	return iter.NewHeapIterator(result, req.Direction), nil
-}
-
-func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk, req *logproto.QueryRequest) error {
-	sp, ctx := opentracing.StartSpanFromContext(ctx, "loadFirstChunks")
-	defer sp.Finish()
-
-	// If chunks span buckets, then we'll have different fetchers for each bucket.
-	chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}
-	for _, lchks := range chks {
-		for _, lchk := range lchks {
-			if len(lchk) == 0 {
-				continue
-			}
-			// load the last chunk if we're doing BACKWARD
-			if req.Direction == logproto.BACKWARD {
-				chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[len(lchk)-1])
-			} else {
-				chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0])
-			}
-		}
-	}
-
-	errChan := make(chan error)
-	for fetcher, chunks := range chksByFetcher {
-		go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {
-
-			keys := make([]string, 0, len(chunks))
-			chks := make([]chunk.Chunk, 0, len(chunks))
-			index := make(map[string]*chunkenc.LazyChunk, len(chunks))
-
-			for _, chk := range chunks {
-				key := chk.Chunk.ExternalKey()
-				keys = append(keys, key)
-				chks = append(chks, chk.Chunk)
-				index[key] = chk
-			}
-			chks, err := fetcher.FetchChunks(ctx, chks, keys)
-			if err != nil {
-				errChan <- err
-				return
-			}
-			// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
-			for _, chk := range chks {
-				index[chk.ExternalKey()].Chunk = chk
-			}
-
-			errChan <- nil
-		}(fetcher, chunks)
-	}
-
-	var lastErr error
-	for i := 0; i < len(chksByFetcher); i++ {
-		if err := <-errChan; err != nil {
-			lastErr = err
-		}
-	}
-
-	return lastErr
-}
-
-func partitionBySeriesChunks(chunks [][]chunk.Chunk, fetchers []*chunk.Fetcher) map[model.Fingerprint][][]chunkenc.LazyChunk {
-	chunksByFp := map[model.Fingerprint][]chunkenc.LazyChunk{}
-	for i, chks := range chunks {
-		for _, c := range chks {
-			fp := c.Fingerprint
-			chunksByFp[fp] = append(chunksByFp[fp], chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]})
-		}
-	}
-
-	result := make(map[model.Fingerprint][][]chunkenc.LazyChunk, len(chunksByFp))
-
-	for fp, chks := range chunksByFp {
-		result[fp] = partitionOverlappingChunks(chks)
-	}
-
-	return result
-}
-
-// partitionOverlappingChunks splits the list of chunks into different non-overlapping lists.
-func partitionOverlappingChunks(chunks []chunkenc.LazyChunk) [][]chunkenc.LazyChunk {
-	sort.Slice(chunks, func(i, j int) bool {
-		return chunks[i].Chunk.From < chunks[j].Chunk.From
-	})
-
-	css := [][]chunkenc.LazyChunk{}
-outer:
-	for _, c := range chunks {
-		for i, cs := range css {
-			// If the chunk doesn't overlap with the current list, then add it to it.
-			if cs[len(cs)-1].Chunk.Through.Before(c.Chunk.From) {
-				css[i] = append(css[i], c)
-				continue outer
-			}
-		}
-		// If the chunk overlaps with every existing list, then create a new list.
-		cs := make([]chunkenc.LazyChunk, 0, len(chunks)/(len(css)+1))
-		cs = append(cs, c)
-		css = append(css, cs)
-	}
-
-	return css
-}
diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go
index 899fa12b..fd34bfe0 100644
--- a/pkg/storage/store_test.go
+++ b/pkg/storage/store_test.go
@@ -3,17 +3,17 @@ package storage
 import (
 	"context"
 	"log"
+	"net/http"
+	_ "net/http/pprof"
 	"runtime"
 	"testing"
 	"time"
 
-	"net/http"
-	_ "net/http/pprof"
-
 	"github.com/cortexproject/cortex/pkg/chunk"
 	"github.com/cortexproject/cortex/pkg/chunk/local"
 	"github.com/cortexproject/cortex/pkg/chunk/storage"
 	"github.com/cortexproject/cortex/pkg/util/validation"
+	"github.com/grafana/loki/pkg/iter"
 	"github.com/grafana/loki/pkg/logproto"
 	"github.com/prometheus/common/model"
 	"github.com/weaveworks/common/user"
@@ -23,7 +23,7 @@ var (
 	start      = model.Time(1523750400000)
 	m          runtime.MemStats
 	ctx        = user.InjectOrgID(context.Background(), "fake")
-	chunkStore = getStore()
+	chunkStore = getLocalStore()
 )
 
 //go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out
@@ -107,7 +107,7 @@ func benchmarkStoreQuery(b *testing.B, query *logproto.QueryRequest) {
 			b.Fatal(err)
 		}
 		res := []logproto.Entry{}
-		printHeap(b, false)
+		printHeap(b, true)
 		j := uint32(0)
 		for iter.Next() {
 			j++
@@ -138,10 +138,13 @@ func printHeap(b *testing.B, show bool) {
 	}
 }
 
-func getStore() Store {
-	store, err := NewStore(storage.Config{
-		BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
-		FSConfig:     local.FSConfig{Directory: "/tmp/benchmark/chunks"},
+func getLocalStore() Store {
+	store, err := NewStore(Config{
+		Config: storage.Config{
+			BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
+			FSConfig:     local.FSConfig{Directory: "/tmp/benchmark/chunks"},
+		},
+		MaxChunkBatchSize: 10,
 	}, chunk.StoreConfig{}, chunk.SchemaConfig{
 		Configs: []chunk.PeriodConfig{
 			{
@@ -161,3 +164,187 @@ func getStore() Store {
 	}
 	return store
 }
+
+func Test_store_LazyQuery(t *testing.T) {
+
+	tests := []struct {
+		name     string
+		req      *logproto.QueryRequest
+		expected []*logproto.Stream
+	}{
+		{
+			"all",
+			newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD),
+			[]*logproto.Stream{
+				{
+					Labels: "{foo=\"bar\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+
+						{
+							Timestamp: from.Add(4 * time.Millisecond),
+							Line:      "5",
+						},
+						{
+							Timestamp: from.Add(5 * time.Millisecond),
+							Line:      "6",
+						},
+					},
+				},
+				{
+					Labels: "{foo=\"bazz\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+
+						{
+							Timestamp: from.Add(4 * time.Millisecond),
+							Line:      "5",
+						},
+						{
+							Timestamp: from.Add(5 * time.Millisecond),
+							Line:      "6",
+						},
+					},
+				},
+			},
+		},
+		{
+			"filter regex",
+			newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), logproto.FORWARD),
+			[]*logproto.Stream{
+				{
+					Labels: "{foo=\"bar\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+				{
+					Labels: "{foo=\"bazz\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+			},
+		},
+		{
+			"filter matcher",
+			newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD),
+			[]*logproto.Stream{
+				{
+					Labels: "{foo=\"bar\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+
+						{
+							Timestamp: from.Add(time.Millisecond),
+							Line:      "2",
+						},
+						{
+							Timestamp: from.Add(2 * time.Millisecond),
+							Line:      "3",
+						},
+						{
+							Timestamp: from.Add(3 * time.Millisecond),
+							Line:      "4",
+						},
+
+						{
+							Timestamp: from.Add(4 * time.Millisecond),
+							Line:      "5",
+						},
+						{
+							Timestamp: from.Add(5 * time.Millisecond),
+							Line:      "6",
+						},
+					},
+				},
+			},
+		},
+		{
+			"filter time",
+			newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), logproto.FORWARD),
+			[]*logproto.Stream{
+				{
+					Labels: "{foo=\"bar\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+				{
+					Labels: "{foo=\"bazz\"}",
+					Entries: []logproto.Entry{
+						{
+							Timestamp: from,
+							Line:      "1",
+						},
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			s := &store{
+				Store: storeFixture,
+				cfg: Config{
+					MaxChunkBatchSize: 10,
+				},
+			}
+			it, err := s.LazyQuery(context.Background(), tt.req)
+			if err != nil {
+				t.Errorf("store.LazyQuery() error = %v", err)
+				return
+			}
+			streams, _, err := iter.ReadBatch(it, tt.req.Limit)
+			_ = it.Close()
+			if err != nil {
+				t.Fatalf("error reading batch %s", err)
+			}
+			assertStream(t, tt.expected, streams.Streams)
+		})
+	}
+}
diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go
new file mode 100644
index 00000000..6aa0cfaf
--- /dev/null
+++ b/pkg/storage/util_test.go
@@ -0,0 +1,245 @@
+package storage
+
+import (
+	"context"
+	"sort"
+	"testing"
+	"time"
+
+	"github.com/cortexproject/cortex/pkg/chunk"
+	"github.com/cortexproject/cortex/pkg/chunk/cache"
+	"github.com/cortexproject/cortex/pkg/ingester/client"
+	"github.com/davecgh/go-spew/spew"
+	"github.com/grafana/loki/pkg/chunkenc"
+	"github.com/grafana/loki/pkg/logproto"
+	"github.com/grafana/loki/pkg/logql"
+	"github.com/grafana/loki/pkg/util"
+	"github.com/prometheus/common/model"
+	"github.com/prometheus/prometheus/pkg/labels"
+	"github.com/stretchr/testify/assert"
+)
+
+var fooLabels = "{foo=\"bar\"}"
+
+var from = time.Unix(0, time.Millisecond.Nanoseconds())
+
+func assertStream(t *testing.T, expected, actual []*logproto.Stream) {
+	if len(expected) != len(actual) {
+		t.Fatalf("error stream length are different expected %d actual %d\n%s", len(expected), len(actual), spew.Sdump(expected, actual))
+		return
+	}
+	sort.Slice(expected, func(i int, j int) bool { return expected[i].Labels < expected[j].Labels })
+	sort.Slice(actual, func(i int, j int) bool { return actual[i].Labels < actual[j].Labels })
+	for i := range expected {
+		assert.Equal(t, expected[i].Labels, actual[i].Labels)
+		if len(expected[i].Entries) != len(actual[i].Entries) {
+			t.Fatalf("error entries length are different expected %d actual%d\n%s", len(expected[i].Entries), len(actual[i].Entries), spew.Sdump(expected[i].Entries, actual[i].Entries))
+
+			return
+		}
+		for j := range expected[i].Entries {
+			assert.Equal(t, expected[i].Entries[j].Timestamp.UnixNano(), actual[i].Entries[j].Timestamp.UnixNano())
+			assert.Equal(t, expected[i].Entries[j].Line, actual[i].Entries[j].Line)
+		}
+	}
+}
+
+func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk {
+	return &chunkenc.LazyChunk{
+		Fetcher: nil,
+		Chunk:   newChunk(stream),
+	}
+}
+
+func newChunk(stream logproto.Stream) chunk.Chunk {
+	lbs, err := util.ToClientLabels(stream.Labels)
+	if err != nil {
+		panic(err)
+	}
+	l := client.FromLabelAdaptersToLabels(lbs)
+	if !l.Has(labels.MetricName) {
+		builder := labels.NewBuilder(l)
+		builder.Set(labels.MetricName, "logs")
+		l = builder.Labels()
+	}
+	from, through := model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano())
+	chk := chunkenc.NewMemChunk(chunkenc.EncGZIP)
+	for _, e := range stream.Entries {
+		if e.Timestamp.UnixNano() < from.UnixNano() {
+			from = model.TimeFromUnixNano(e.Timestamp.UnixNano())
+		}
+		if e.Timestamp.UnixNano() > through.UnixNano() {
+			through = model.TimeFromUnixNano(e.Timestamp.UnixNano())
+		}
+		_ = chk.Append(&e)
+	}
+	chk.Close()
+	c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk), from, through)
+	// force the checksum creation
+	if err := c.Encode(); err != nil {
+		panic(err)
+	}
+	return c
+}
+
+func newMatchers(matchers string) []*labels.Matcher {
+	ls, err := logql.ParseExpr(matchers)
+	if err != nil {
+		panic(err)
+	}
+	return ls.Matchers()
+}
+
+func newQuery(query string, start, end time.Time, direction logproto.Direction) *logproto.QueryRequest {
+	return &logproto.QueryRequest{
+		Query:     query,
+		Start:     start,
+		Limit:     1000,
+		End:       end,
+		Direction: direction,
+	}
+}
+
+type mockChunkStore struct {
+	chunks []chunk.Chunk
+}
+
+func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore {
+	chunks := make([]chunk.Chunk, 0, len(streams))
+	for _, s := range streams {
+		chunks = append(chunks, newChunk(*s))
+	}
+	return &mockChunkStore{chunks: chunks}
+}
+func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil }
+func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
+	return nil
+}
+func (m *mockChunkStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
+	return nil, nil
+}
+func (m *mockChunkStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
+	return nil, nil
+}
+func (m *mockChunkStore) Stop() {}
+func (m *mockChunkStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
+	return nil, nil
+}
+
+// PutChunks implements ObjectClient from Fetcher
+func (m *mockChunkStore) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { return nil }
+
+// GetChunks implements ObjectClient from Fetcher
+func (m *mockChunkStore) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
+	var res []chunk.Chunk
+	for _, c := range chunks {
+		for _, sc := range m.chunks {
+			// only returns chunks requested using the external key
+			if c.ExternalKey() == sc.ExternalKey() {
+				res = append(res, sc)
+			}
+		}
+	}
+	return res, nil
+}
+
+func (m *mockChunkStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
+	refs := make([]chunk.Chunk, 0, len(m.chunks))
+	// transform real chunks into ref chunks.
+	for _, c := range m.chunks {
+		r, err := chunk.ParseExternalKey("fake", c.ExternalKey())
+		if err != nil {
+			panic(err)
+		}
+		refs = append(refs, r)
+	}
+	f, err := chunk.NewChunkFetcher(cache.Config{}, false, m)
+	if err != nil {
+		panic(err)
+	}
+	return [][]chunk.Chunk{refs}, []*chunk.Fetcher{f}, nil
+}
+
+var streamsFixture = []*logproto.Stream{
+	{
+		Labels: "{foo=\"bar\"}",
+		Entries: []logproto.Entry{
+			{
+				Timestamp: from,
+				Line:      "1",
+			},
+
+			{
+				Timestamp: from.Add(time.Millisecond),
+				Line:      "2",
+			},
+			{
+				Timestamp: from.Add(2 * time.Millisecond),
+				Line:      "3",
+			},
+		},
+	},
+	{
+		Labels: "{foo=\"bar\"}",
+		Entries: []logproto.Entry{
+			{
+				Timestamp: from.Add(2 * time.Millisecond),
+				Line:      "3",
+			},
+			{
+				Timestamp: from.Add(3 * time.Millisecond),
+				Line:      "4",
+			},
+
+			{
+				Timestamp: from.Add(4 * time.Millisecond),
+				Line:      "5",
+			},
+			{
+				Timestamp: from.Add(5 * time.Millisecond),
+				Line:      "6",
+			},
+		},
+	},
+	{
+		Labels: "{foo=\"bazz\"}",
+		Entries: []logproto.Entry{
+			{
+				Timestamp: from,
+				Line:      "1",
+			},
+
+			{
+				Timestamp: from.Add(time.Millisecond),
+				Line:      "2",
+			},
+			{
+				Timestamp: from.Add(2 * time.Millisecond),
+				Line:      "3",
+			},
+		},
+	},
+	{
+		Labels: "{foo=\"bazz\"}",
+		Entries: []logproto.Entry{
+			{
+				Timestamp: from.Add(2 * time.Millisecond),
+				Line:      "3",
+			},
+			{
+				Timestamp: from.Add(3 * time.Millisecond),
+				Line:      "4",
+			},
+
+			{
+				Timestamp: from.Add(4 * time.Millisecond),
+				Line:      "5",
+			},
+			{
+				Timestamp: from.Add(5 * time.Millisecond),
+				Line:      "6",
+			},
+		},
+	},
+}
+var storeFixture = newMockChunkStore(streamsFixture)
-- 
GitLab