Skip to content
Snippets Groups Projects
Commit a422f394 authored by Tom Wilkie's avatar Tom Wilkie
Browse files

Wait for the right number of streams.


Signed-off-by: default avatarTom Wilkie <tom.wilkie@gmail.com>
parent 6d196322
No related branches found
No related tags found
No related merge requests found
......@@ -116,17 +116,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]streamTracker, len(req.Streams))
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
for i, stream := range req.Streams {
for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
continue
}
keys = append(keys, tokenFor(userID, stream.Labels))
streams[i].stream = stream
streams = append(streams, streamTracker{
stream: stream,
})
}
if len(streams) == 0 {
......@@ -150,7 +152,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
tracker := pushTracker{
samplesPending: int32(len(streams)),
samplesPending: int32(len(samplesByIngester)),
done: make(chan struct{}),
err: make(chan error),
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment