Skip to content
Snippets Groups Projects
Commit 714e5d35 authored by Tom Wilkie's avatar Tom Wilkie
Browse files

Honor limit in ingesters


Signed-off-by: default avatarTom Wilkie <tom.wilkie@gmail.com>
parent f35409c7
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,7 @@ import (
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/parser"
"github.com/grafana/logish/pkg/querier"
"github.com/grafana/logish/pkg/util"
)
const queryBatchSize = 128
......@@ -80,15 +81,17 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
iterator := querier.NewHeapIterator(iterators)
defer iterator.Close()
return sendBatches(iterator, queryServer)
return sendBatches(iterator, queryServer, req.Limit)
}
func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for {
batch, err := querier.ReadBatch(i, queryBatchSize)
func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
sent := uint32(0)
for sent < limit {
batch, batchSize, err := querier.ReadBatch(i, util.MinUint32(queryBatchSize, limit-sent))
if err != nil {
return err
}
sent += batchSize
if len(batch.Streams) == 0 {
return nil
......@@ -98,4 +101,5 @@ func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServ
return err
}
}
return nil
}
......@@ -104,13 +104,14 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr
iterator := NewHeapIterator(iterators)
defer iterator.Close()
return ReadBatch(iterator, req.Limit)
resp, _, err := ReadBatch(iterator, req.Limit)
return resp, err
}
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) {
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
streams := map[string]*logproto.Stream{}
for respSize := uint32(0); respSize < size && i.Next(); respSize++ {
respSize := uint32(0)
for ; respSize < size && i.Next(); respSize++ {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
......@@ -128,7 +129,7 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) {
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
}
return &result, i.Error()
return &result, respSize, i.Error()
}
// Check implements the grpc healthcheck
......
package util
func MinUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment