diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index e9636ea524cc6ce963dcf1eeeead664b2b88c428..2ee4cb70fa46fbbe700c46cf709f44e4f2ab3d07 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -25,8 +25,11 @@ type tailer struct { expr logql.Expr sendChan chan *logproto.Stream - done chan struct{} - closeMtx sync.Mutex + + // Signaling channel used to notify once the tailer gets closed + // and the loop and senders should stop + closeChan chan struct{} + closeOnce sync.Once blockedAt *time.Time blockedMtx sync.RWMutex @@ -54,7 +57,7 @@ func newTailer(orgID, query string, conn logproto.Querier_TailServer) (*tailer, conn: conn, droppedStreams: []*logproto.DroppedStream{}, id: generateUniqueID(orgID, query), - done: make(chan struct{}), + closeChan: make(chan struct{}), expr: expr, }, nil } @@ -75,7 +78,7 @@ func (t *tailer) loop() { t.close() return } - case <-t.done: + case <-t.closeChan: return case stream, ok = <-t.sendChan: if !ok { @@ -147,7 +150,7 @@ func (t *tailer) isWatchingLabels(metric model.Metric) bool { func (t *tailer) isClosed() bool { select { - case <-t.done: + case <-t.closeChan: return true default: return false @@ -155,18 +158,15 @@ func (t *tailer) isClosed() bool { } func (t *tailer) close() { - if t.isClosed() { - return - } - - t.closeMtx.Lock() - defer t.closeMtx.Unlock() - - if t.isClosed() { - return - } - close(t.done) - close(t.sendChan) + t.closeOnce.Do(func() { + // Signal the close channel + close(t.closeChan) + + // We intentionally do not close sendChan in order to avoid a panic on + // send to a just-closed channel. It's OK not to close a channel, since + // it will be eventually garbage collected as soon as no goroutine + // references it anymore, whether it has been closed or not. + }) } func (t *tailer) blockedSince() *time.Time { diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..da3adf85e909b6c70595db62ecaaa42aaa1b0cd2 --- /dev/null +++ b/pkg/ingester/tailer_test.go @@ -0,0 +1,47 @@ +package ingester + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { + runs := 100 + + stream := logproto.Stream{ + Labels: `{type="test"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(int64(1), 0), Line: "line 1"}, + {Timestamp: time.Unix(int64(2), 0), Line: "line 2"}, + }, + } + + for run := 0; run < runs; run++ { + tailer, err := newTailer("org-id", stream.Labels, nil) + require.NoError(t, err) + require.NotNil(t, tailer) + + routines := sync.WaitGroup{} + routines.Add(2) + + go assert.NotPanics(t, func() { + defer routines.Done() + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + tailer.send(stream) + }) + + go assert.NotPanics(t, func() { + defer routines.Done() + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + tailer.close() + }) + + routines.Wait() + } +}