Skip to content
Snippets Groups Projects
Commit 29f08ccb authored by Marco Pracucci's avatar Marco Pracucci Committed by Cyril Tovena
Browse files

Fix panic in tailer due to race condition between send() and close() (#986)

parent 5c99fe38
No related branches found
No related tags found
No related merge requests found
......@@ -25,8 +25,11 @@ type tailer struct {
expr logql.Expr
sendChan chan *logproto.Stream
done chan struct{}
closeMtx sync.Mutex
// Signaling channel used to notify once the tailer gets closed
// and the loop and senders should stop
closeChan chan struct{}
closeOnce sync.Once
blockedAt *time.Time
blockedMtx sync.RWMutex
......@@ -54,7 +57,7 @@ func newTailer(orgID, query string, conn logproto.Querier_TailServer) (*tailer,
conn: conn,
droppedStreams: []*logproto.DroppedStream{},
id: generateUniqueID(orgID, query),
done: make(chan struct{}),
closeChan: make(chan struct{}),
expr: expr,
}, nil
}
......@@ -75,7 +78,7 @@ func (t *tailer) loop() {
t.close()
return
}
case <-t.done:
case <-t.closeChan:
return
case stream, ok = <-t.sendChan:
if !ok {
......@@ -147,7 +150,7 @@ func (t *tailer) isWatchingLabels(metric model.Metric) bool {
func (t *tailer) isClosed() bool {
select {
case <-t.done:
case <-t.closeChan:
return true
default:
return false
......@@ -155,18 +158,15 @@ func (t *tailer) isClosed() bool {
}
func (t *tailer) close() {
if t.isClosed() {
return
}
t.closeMtx.Lock()
defer t.closeMtx.Unlock()
if t.isClosed() {
return
}
close(t.done)
close(t.sendChan)
t.closeOnce.Do(func() {
// Signal the close channel
close(t.closeChan)
// We intentionally do not close sendChan in order to avoid a panic on
// send to a just-closed channel. It's OK not to close a channel, since
// it will be eventually garbage collected as soon as no goroutine
// references it anymore, whether it has been closed or not.
})
}
func (t *tailer) blockedSince() *time.Time {
......
package ingester
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
runs := 100
stream := logproto.Stream{
Labels: `{type="test"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(int64(1), 0), Line: "line 1"},
{Timestamp: time.Unix(int64(2), 0), Line: "line 2"},
},
}
for run := 0; run < runs; run++ {
tailer, err := newTailer("org-id", stream.Labels, nil)
require.NoError(t, err)
require.NotNil(t, tailer)
routines := sync.WaitGroup{}
routines.Add(2)
go assert.NotPanics(t, func() {
defer routines.Done()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
tailer.send(stream)
})
go assert.NotPanics(t, func() {
defer routines.Done()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
tailer.close()
})
routines.Wait()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment