diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 9a48723710517c20a092bb87b19ec1084e2dc6a3..52f264cc94429023a69af426b49f5d354e058891 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -165,10 +165,8 @@ func (i *Ingester) transferOut(ctx context.Context) error { ctx = user.InjectOrgID(ctx, "-1") stream, err := ic.TransferChunks(ctx) - - _, err = stream.CloseAndRecv() if err != nil { - return errors.Wrap(err, "CloseAndRecv") + return errors.Wrap(err, "TransferChunks") } for instanceID, inst := range i.instances { @@ -191,17 +189,26 @@ func (i *Ingester) transferOut(ctx context.Context) error { lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) } - stream.Send(&logproto.TimeSeriesChunk{ + err := stream.Send(&logproto.TimeSeriesChunk{ Chunks: chunks, UserId: instanceID, Labels: lbls, FromIngesterId: i.lifecycler.ID, }) + if err != nil { + level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + return err + } sentChunks.Add(float64(len(chunks))) } } + _, err = stream.CloseAndRecv() + if err != nil { + return errors.Wrap(err, "CloseAndRecv") + } + for _, flushQueue := range i.flushQueues { flushQueue.DiscardAndClose() }