From 2eb666b0a8b53fb24e315b29a84c2bfdffa33532 Mon Sep 17 00:00:00 2001 From: Robert Fratto <robert.fratto@grafana.com> Date: Mon, 22 Jul 2019 14:04:01 -0400 Subject: [PATCH] ingester: fix lint issues for chunk transfers --- pkg/ingester/transfer.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 9a487237..52f264cc 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -165,10 +165,8 @@ func (i *Ingester) transferOut(ctx context.Context) error { ctx = user.InjectOrgID(ctx, "-1") stream, err := ic.TransferChunks(ctx) - - _, err = stream.CloseAndRecv() if err != nil { - return errors.Wrap(err, "CloseAndRecv") + return errors.Wrap(err, "TransferChunks") } for instanceID, inst := range i.instances { @@ -191,17 +189,26 @@ func (i *Ingester) transferOut(ctx context.Context) error { lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) } - stream.Send(&logproto.TimeSeriesChunk{ + err := stream.Send(&logproto.TimeSeriesChunk{ Chunks: chunks, UserId: instanceID, Labels: lbls, FromIngesterId: i.lifecycler.ID, }) + if err != nil { + level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + return err + } sentChunks.Add(float64(len(chunks))) } } + _, err = stream.CloseAndRecv() + if err != nil { + return errors.Wrap(err, "CloseAndRecv") + } + for _, flushQueue := range i.flushQueues { flushQueue.DiscardAndClose() } -- GitLab