From b707818bab1ecc49928d4217640b11e5ae4866e4 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> Date: Thu, 9 May 2019 12:02:04 +0530 Subject: [PATCH] Some code refactoring in live log tailing --- cmd/logcli/tail.go | 29 +- pkg/distributor/distributor.go | 2 +- pkg/ingester/instance.go | 6 +- pkg/ingester/tailer.go | 87 ++- pkg/logproto/logproto.pb.go | 1088 ++++++++++++++++++++++++++++++-- pkg/logql/ast.go | 9 + pkg/querier/http.go | 31 +- pkg/querier/querier.go | 21 +- pkg/querier/tail.go | 139 ++-- 9 files changed, 1211 insertions(+), 201 deletions(-) diff --git a/cmd/logcli/tail.go b/cmd/logcli/tail.go index aef59439..72511a45 100644 --- a/cmd/logcli/tail.go +++ b/cmd/logcli/tail.go @@ -3,28 +3,17 @@ package main import ( "fmt" "log" - "time" - "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/querier" ) -type droppedEntry struct { - Timestamp time.Time - Labels string -} - -type tailResponse struct { - Stream logproto.Stream - DroppedEntries []droppedEntry -} - func tailQuery() { conn, err := liveTailQueryConn() if err != nil { log.Fatalf("Tailing logs failed: %+v", err) } - resp := new(tailResponse) + resp := new(querier.TailResponse) for { err := conn.ReadJSON(resp) @@ -32,13 +21,15 @@ func tailQuery() { log.Println("Error reading stream:", err) return } + for _, stream := range resp.Stream { - labels := "" - if !*noLabels { - labels = resp.Stream.Labels - } - for _, entry := range resp.Stream.Entries { - printLogEntry(entry.Timestamp, labels, entry.Line) + labels := "" + if !*noLabels { + labels = stream.Labels + } + for _, entry := range stream.Entries { + printLogEntry(entry.Timestamp, labels, entry.Line) + } } if len(resp.DroppedEntries) != 0 { for _, d := range resp.DroppedEntries { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0a13b00e..107cdfbf 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -129,7 +129,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log continue } - keys = append(keys, tokenFor(userID, stream.Labels)) + keys = append(keys, util.TokenFor(userID, stream.Labels)) streams = append(streams, streamTracker{ stream: stream, }) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index b6967adb..83f3062f 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -170,13 +170,13 @@ func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels. } func (i *instance) addTailer(t *tailer) { - i.tailerMtx.Lock() - defer i.tailerMtx.Unlock() - tailers := map[uint32]*tailer{t.getID(): t} for _, stream := range i.streams { i.addTailersToStream(tailers, stream) } + + i.tailerMtx.Lock() + defer i.tailerMtx.Unlock() i.tailers[t.getID()] = t } diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index f94d91b4..72e16dd5 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -11,14 +11,12 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/parser" + "github.com/grafana/loki/pkg/logql" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) -// This is to limit size of dropped stream to make it easier for querier to query. -// While dropping stream we divide it into batches and use start and end time of each batch to build dropped stream metadata -const maxDroppedStreamSize = 10000 +const bufferSizeForTailResponse = 5 type tailer struct { id uint32 @@ -26,8 +24,9 @@ type tailer struct { matchers []*labels.Matcher regexp *regexp.Regexp - sendChan chan *logproto.Stream - closed bool + sendChan chan *logproto.Stream + closed bool + closedMtx sync.RWMutex blockedAt *time.Time blockedMtx sync.RWMutex @@ -37,12 +36,14 @@ type tailer struct { } func newTailer(orgID, query, regex string, conn logproto.Querier_TailServer) (*tailer, error) { - matchers, err := parser.Matchers(query) + expr, err := logql.ParseExpr(query) if err != nil { return nil, err } + matchers := expr.Matchers() var re *regexp.Regexp + if regex != "" { re, err = regexp.Compile(regex) if err != nil { @@ -54,7 +55,7 @@ func newTailer(orgID, query, regex string, conn logproto.Querier_TailServer) (*t orgID: orgID, matchers: matchers, regexp: re, - sendChan: make(chan *logproto.Stream, 2), + sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), conn: conn, droppedStreams: []*logproto.DroppedStream{}, id: generateUniqueID(orgID, query, regex), @@ -64,16 +65,19 @@ func newTailer(orgID, query, regex string, conn logproto.Querier_TailServer) (*t func (t *tailer) loop() { var stream *logproto.Stream var err error + var ok bool for { - if t.closed { + if t.isClosed() { return } - stream = <-t.sendChan - if stream == nil { + stream, ok = <-t.sendChan + if !ok { t.close() return + } else if stream == nil { + continue } // while sending new stream pop lined up dropped streams metadata for sending to querier @@ -88,7 +92,7 @@ func (t *tailer) loop() { } func (t *tailer) send(stream logproto.Stream) { - if t.closed { + if t.isClosed() { return } @@ -130,16 +134,8 @@ func (t *tailer) filterEntriesInStream(stream *logproto.Stream) { } func (t *tailer) isWatchingLabels(metric model.Metric) bool { - var labelValue model.LabelValue - var ok bool - for _, matcher := range t.matchers { - labelValue, ok = metric[model.LabelName(matcher.Name)] - if !ok { - return false - } - - if !matcher.Matches(string(labelValue)) { + if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) { return false } } @@ -148,10 +144,16 @@ func (t *tailer) isWatchingLabels(metric model.Metric) bool { } func (t *tailer) isClosed() bool { + t.closedMtx.RLock() + defer t.closedMtx.RUnlock() + return t.closed } func (t *tailer) close() { + t.closedMtx.Lock() + defer t.closedMtx.Unlock() + t.closed = true } @@ -163,13 +165,23 @@ func (t *tailer) blockedSince() *time.Time { } func (t *tailer) dropStream(stream logproto.Stream) { + if len(stream.Entries) == 0 { + return + } + t.blockedMtx.Lock() defer t.blockedMtx.Unlock() - blockedAt := new(time.Time) - *blockedAt = time.Now() - t.blockedAt = blockedAt - t.droppedStreams = append(t.droppedStreams, breakDroppedStream(stream)...) + if t.blockedAt == nil { + blockedAt := time.Now() + t.blockedAt = &blockedAt + } + droppedStream := logproto.DroppedStream{ + From: stream.Entries[0].Timestamp, + To: stream.Entries[len(stream.Entries)-1].Timestamp, + Labels: stream.Labels, + } + t.droppedStreams = append(t.droppedStreams, &droppedStream) } func (t *tailer) popDroppedStreams() []*logproto.DroppedStream { @@ -191,31 +203,6 @@ func (t *tailer) getID() uint32 { return t.id } -func breakDroppedStream(stream logproto.Stream) []*logproto.DroppedStream { - streamLength := len(stream.Entries) - numberOfBreaks := streamLength / maxDroppedStreamSize - - if streamLength%maxDroppedStreamSize != 0 { - numberOfBreaks++ - } - droppedStreams := make([]*logproto.DroppedStream, numberOfBreaks) - for i := 0; i < numberOfBreaks; i++ { - droppedStream := new(logproto.DroppedStream) - droppedStream.From = stream.Entries[i*maxDroppedStreamSize].Timestamp - droppedStream.To = stream.Entries[((i+1)*maxDroppedStreamSize)-1].Timestamp - droppedStream.Labels = stream.Labels - droppedStreams[i] = droppedStream - } - - if streamLength%maxDroppedStreamSize != 0 { - droppedStream := new(logproto.DroppedStream) - droppedStream.From = stream.Entries[numberOfBreaks*maxDroppedStreamSize].Timestamp - droppedStream.To = stream.Entries[streamLength-1].Timestamp - droppedStreams[numberOfBreaks] = droppedStream - } - return droppedStreams -} - // An id is useful in managing tailer instances func generateUniqueID(orgID, query, regex string) uint32 { uniqueID := fnv.New32() diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index c7598067..d2c0e3ed 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -452,6 +452,175 @@ func (m *Entry) GetLine() string { return "" } +type TailRequest struct { + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + Regex string `protobuf:"bytes,2,opt,name=regex,proto3" json:"regex,omitempty"` + DelayFor uint32 `protobuf:"varint,3,opt,name=delayFor,proto3" json:"delayFor,omitempty"` +} + +func (m *TailRequest) Reset() { *m = TailRequest{} } +func (*TailRequest) ProtoMessage() {} +func (*TailRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{8} +} +func (m *TailRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailRequest.Merge(m, src) +} +func (m *TailRequest) XXX_Size() int { + return m.Size() +} +func (m *TailRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TailRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TailRequest proto.InternalMessageInfo + +func (m *TailRequest) GetQuery() string { + if m != nil { + return m.Query + } + return "" +} + +func (m *TailRequest) GetRegex() string { + if m != nil { + return m.Regex + } + return "" +} + +func (m *TailRequest) GetDelayFor() uint32 { + if m != nil { + return m.DelayFor + } + return 0 +} + +type TailResponse struct { + Stream *Stream `protobuf:"bytes,1,opt,name=stream,proto3" json:"stream,omitempty"` + DroppedStreams []*DroppedStream `protobuf:"bytes,2,rep,name=droppedStreams,proto3" json:"droppedStreams,omitempty"` +} + +func (m *TailResponse) Reset() { *m = TailResponse{} } +func (*TailResponse) ProtoMessage() {} +func (*TailResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{9} +} +func (m *TailResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailResponse.Merge(m, src) +} +func (m *TailResponse) XXX_Size() int { + return m.Size() +} +func (m *TailResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TailResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TailResponse proto.InternalMessageInfo + +func (m *TailResponse) GetStream() *Stream { + if m != nil { + return m.Stream + } + return nil +} + +func (m *TailResponse) GetDroppedStreams() []*DroppedStream { + if m != nil { + return m.DroppedStreams + } + return nil +} + +type DroppedStream struct { + From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` + To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` + Labels string `protobuf:"bytes,3,opt,name=labels,proto3" json:"labels,omitempty"` +} + +func (m *DroppedStream) Reset() { *m = DroppedStream{} } +func (*DroppedStream) ProtoMessage() {} +func (*DroppedStream) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{10} +} +func (m *DroppedStream) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DroppedStream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DroppedStream.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DroppedStream) XXX_Merge(src proto.Message) { + xxx_messageInfo_DroppedStream.Merge(m, src) +} +func (m *DroppedStream) XXX_Size() int { + return m.Size() +} +func (m *DroppedStream) XXX_DiscardUnknown() { + xxx_messageInfo_DroppedStream.DiscardUnknown(m) +} + +var xxx_messageInfo_DroppedStream proto.InternalMessageInfo + +func (m *DroppedStream) GetFrom() time.Time { + if m != nil { + return m.From + } + return time.Time{} +} + +func (m *DroppedStream) GetTo() time.Time { + if m != nil { + return m.To + } + return time.Time{} +} + +func (m *DroppedStream) GetLabels() string { + if m != nil { + return m.Labels + } + return "" +} + func init() { proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest") @@ -462,50 +631,61 @@ func init() { proto.RegisterType((*LabelResponse)(nil), "logproto.LabelResponse") proto.RegisterType((*Stream)(nil), "logproto.Stream") proto.RegisterType((*Entry)(nil), "logproto.Entry") + proto.RegisterType((*TailRequest)(nil), "logproto.TailRequest") + proto.RegisterType((*TailResponse)(nil), "logproto.TailResponse") + proto.RegisterType((*DroppedStream)(nil), "logproto.DroppedStream") } func init() { proto.RegisterFile("logproto.proto", fileDescriptor_7a8976f235a02f79) } var fileDescriptor_7a8976f235a02f79 = []byte{ - // 601 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, - 0x10, 0xf5, 0xb6, 0x89, 0x13, 0x4f, 0xd2, 0xb4, 0xda, 0xdf, 0x8f, 0x62, 0x45, 0x68, 0x1d, 0xf9, - 0x00, 0x51, 0x25, 0x5c, 0x08, 0x88, 0x4a, 0x85, 0x4b, 0x4d, 0xa9, 0x90, 0x40, 0x02, 0x16, 0x24, - 0xce, 0x4e, 0xbb, 0xb8, 0x96, 0xfc, 0xa7, 0xb5, 0xd7, 0x88, 0xde, 0x90, 0xf8, 0x02, 0xfd, 0x18, - 0x7c, 0x94, 0x1e, 0x73, 0xec, 0x29, 0x10, 0xe7, 0x82, 0x72, 0xea, 0x8d, 0x2b, 0xda, 0xb5, 0x1d, - 0x1b, 0x90, 0x40, 0x5c, 0x9c, 0x79, 0xbb, 0xef, 0xcd, 0xec, 0x9b, 0x99, 0x40, 0xcf, 0x8f, 0xdc, - 0x93, 0x38, 0xe2, 0x91, 0x25, 0xbf, 0xb8, 0x5d, 0xe2, 0xbe, 0xe1, 0x46, 0x91, 0xeb, 0xb3, 0x6d, - 0x89, 0xc6, 0xe9, 0xbb, 0x6d, 0xee, 0x05, 0x2c, 0xe1, 0x4e, 0x70, 0x92, 0x53, 0xfb, 0xb7, 0x5d, - 0x8f, 0x1f, 0xa7, 0x63, 0xeb, 0x30, 0x0a, 0xb6, 0xdd, 0xc8, 0x8d, 0x2a, 0xa6, 0x40, 0x12, 0xc8, - 0x28, 0xa7, 0x9b, 0x07, 0xd0, 0x79, 0x99, 0x26, 0xc7, 0x94, 0x9d, 0xa6, 0x2c, 0xe1, 0x78, 0x07, - 0x5a, 0x09, 0x8f, 0x99, 0x13, 0x24, 0x3a, 0x1a, 0xac, 0x0e, 0x3b, 0xa3, 0x0d, 0x6b, 0xf9, 0x94, - 0xd7, 0xf2, 0xc2, 0xee, 0x2c, 0xa6, 0x46, 0x49, 0xa2, 0x65, 0x60, 0xf6, 0xa0, 0x9b, 0xe7, 0x49, - 0x4e, 0xa2, 0x30, 0x61, 0xe6, 0x77, 0x04, 0xdd, 0x57, 0x29, 0x8b, 0xcf, 0xca, 0xcc, 0xff, 0x43, - 0xf3, 0x54, 0x60, 0x1d, 0x0d, 0xd0, 0x50, 0xa3, 0x39, 0x10, 0xa7, 0xbe, 0x17, 0x78, 0x5c, 0x5f, - 0x19, 0xa0, 0xe1, 0x1a, 0xcd, 0x01, 0xde, 0x85, 0x66, 0xc2, 0x9d, 0x98, 0xeb, 0xab, 0x03, 0x34, - 0xec, 0x8c, 0xfa, 0x56, 0x6e, 0xda, 0x2a, 0xad, 0x58, 0x6f, 0x4a, 0xd3, 0x76, 0xfb, 0x62, 0x6a, - 0x28, 0xe7, 0x5f, 0x0c, 0x44, 0x73, 0x09, 0x7e, 0x00, 0xab, 0x2c, 0x3c, 0xd2, 0x1b, 0xff, 0xa0, - 0x14, 0x02, 0x7c, 0x17, 0xb4, 0x23, 0x2f, 0x66, 0x87, 0xdc, 0x8b, 0x42, 0xbd, 0x39, 0x40, 0xc3, - 0xde, 0xe8, 0xbf, 0xca, 0xfb, 0x7e, 0x79, 0x45, 0x2b, 0x96, 0x78, 0x7c, 0xcc, 0x5c, 0xf6, 0x41, - 0x57, 0x73, 0x4b, 0x12, 0x98, 0x0f, 0x61, 0xad, 0x30, 0x9e, 0xb7, 0x02, 0x6f, 0xfd, 0xb5, 0xa7, - 0x55, 0x1b, 0x77, 0xa1, 0xfb, 0xdc, 0x19, 0x33, 0xbf, 0xec, 0x1a, 0x86, 0x46, 0xe8, 0x04, 0xac, - 0x68, 0x9a, 0x8c, 0xf1, 0x26, 0xa8, 0xef, 0x1d, 0x3f, 0x65, 0x89, 0x6c, 0x5a, 0x9b, 0x16, 0xc8, - 0xbc, 0x05, 0x6b, 0x85, 0xb6, 0x28, 0x5c, 0x11, 0x45, 0x5d, 0x6d, 0x49, 0x3c, 0x06, 0x35, 0xaf, - 0x8b, 0x4d, 0x50, 0x7d, 0x21, 0x49, 0xf2, 0x02, 0x36, 0x2c, 0xa6, 0x46, 0x71, 0x42, 0x8b, 0x5f, - 0xbc, 0x0b, 0x2d, 0x16, 0xf2, 0xd8, 0x93, 0xf5, 0xc4, 0xf3, 0xd7, 0xab, 0xe7, 0x3f, 0x09, 0x79, - 0x7c, 0x66, 0xaf, 0x8b, 0x4e, 0x8a, 0xad, 0x28, 0x78, 0xb4, 0x0c, 0xcc, 0x08, 0x9a, 0x92, 0x82, - 0x9f, 0x82, 0xb6, 0x5c, 0x54, 0x59, 0xeb, 0xcf, 0xb3, 0xe9, 0x15, 0x19, 0x57, 0x78, 0x22, 0x27, - 0x54, 0x89, 0xf1, 0x0d, 0x68, 0xf8, 0x5e, 0xc8, 0xa4, 0x77, 0xcd, 0x6e, 0x2f, 0xa6, 0x86, 0xc4, - 0x54, 0x7e, 0xb7, 0x6e, 0x82, 0xb6, 0x1c, 0x15, 0xee, 0x40, 0xeb, 0xe0, 0x05, 0x7d, 0xbb, 0x47, - 0xf7, 0x37, 0x14, 0xdc, 0x85, 0xb6, 0xbd, 0xf7, 0xf8, 0x99, 0x44, 0x68, 0xb4, 0x07, 0xaa, 0x58, - 0x57, 0x16, 0xe3, 0x1d, 0x68, 0x88, 0x08, 0x5f, 0xab, 0x5c, 0xd5, 0xfe, 0x10, 0xfd, 0xcd, 0x5f, - 0x8f, 0x8b, 0xfd, 0x56, 0x46, 0x9f, 0x10, 0xb4, 0xc4, 0xa0, 0x3d, 0x16, 0xe3, 0x47, 0xd0, 0x94, - 0x33, 0xc7, 0x35, 0x7a, 0x7d, 0xfb, 0xfb, 0xd7, 0x7f, 0x3b, 0x2f, 0xf3, 0xdc, 0x41, 0x62, 0xdd, - 0xe5, 0xe0, 0xea, 0xea, 0xfa, 0x16, 0xd4, 0xd5, 0x3f, 0x4d, 0xd8, 0x54, 0xec, 0xfb, 0x93, 0x19, - 0x51, 0x2e, 0x67, 0x44, 0xb9, 0x9a, 0x11, 0xf4, 0x31, 0x23, 0xe8, 0x73, 0x46, 0xd0, 0x45, 0x46, - 0xd0, 0x24, 0x23, 0xe8, 0x6b, 0x46, 0xd0, 0xb7, 0x8c, 0x28, 0x57, 0x19, 0x41, 0xe7, 0x73, 0xa2, - 0x4c, 0xe6, 0x44, 0xb9, 0x9c, 0x13, 0x65, 0xac, 0xca, 0x64, 0xf7, 0x7e, 0x04, 0x00, 0x00, 0xff, - 0xff, 0x47, 0x69, 0x1e, 0x88, 0x68, 0x04, 0x00, 0x00, + // 728 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x4f, 0x13, 0x4f, + 0x18, 0xdf, 0xe9, 0x7b, 0x9f, 0xbe, 0x40, 0xe6, 0xff, 0x17, 0x9a, 0xc6, 0x6c, 0x9b, 0x3d, 0x68, + 0x43, 0x62, 0xd1, 0x4a, 0x44, 0xd1, 0xc4, 0x50, 0x91, 0x98, 0x68, 0xa2, 0x0e, 0x18, 0xcf, 0x5b, + 0x3a, 0x94, 0x4d, 0xb6, 0x3b, 0x65, 0x77, 0x6a, 0xec, 0xcd, 0x8f, 0xc0, 0xcd, 0xaf, 0xe0, 0xa7, + 0xf0, 0xcc, 0x91, 0x23, 0xa7, 0x2a, 0xcb, 0xc5, 0x70, 0xe2, 0xe6, 0xd5, 0xcc, 0xcb, 0x76, 0x17, + 0x48, 0x04, 0x2f, 0xbb, 0xf3, 0x9b, 0xf9, 0x3d, 0xf3, 0xbc, 0xfd, 0x9e, 0x81, 0xaa, 0xcb, 0x06, + 0x23, 0x9f, 0x71, 0xd6, 0x96, 0x5f, 0x5c, 0x88, 0x70, 0xbd, 0x31, 0x60, 0x6c, 0xe0, 0xd2, 0x65, + 0x89, 0x7a, 0xe3, 0xdd, 0x65, 0xee, 0x0c, 0x69, 0xc0, 0xed, 0xe1, 0x48, 0x51, 0xeb, 0xf7, 0x06, + 0x0e, 0xdf, 0x1b, 0xf7, 0xda, 0x3b, 0x6c, 0xb8, 0x3c, 0x60, 0x03, 0x16, 0x33, 0x05, 0x92, 0x40, + 0xae, 0x14, 0xdd, 0xda, 0x84, 0xd2, 0xbb, 0x71, 0xb0, 0x47, 0xe8, 0xfe, 0x98, 0x06, 0x1c, 0xaf, + 0x42, 0x3e, 0xe0, 0x3e, 0xb5, 0x87, 0x41, 0x0d, 0x35, 0xd3, 0xad, 0x52, 0x67, 0xbe, 0x3d, 0x0b, + 0x65, 0x4b, 0x1e, 0x74, 0x4b, 0x67, 0xd3, 0x46, 0x44, 0x22, 0xd1, 0xc2, 0xaa, 0x42, 0x59, 0xdd, + 0x13, 0x8c, 0x98, 0x17, 0x50, 0xeb, 0x37, 0x82, 0xf2, 0xfb, 0x31, 0xf5, 0x27, 0xd1, 0xcd, 0xff, + 0x43, 0x76, 0x5f, 0xe0, 0x1a, 0x6a, 0xa2, 0x56, 0x91, 0x28, 0x20, 0x76, 0x5d, 0x67, 0xe8, 0xf0, + 0x5a, 0xaa, 0x89, 0x5a, 0x15, 0xa2, 0x00, 0x5e, 0x83, 0x6c, 0xc0, 0x6d, 0x9f, 0xd7, 0xd2, 0x4d, + 0xd4, 0x2a, 0x75, 0xea, 0x6d, 0x95, 0x74, 0x3b, 0x4a, 0xa5, 0xbd, 0x1d, 0x25, 0xdd, 0x2d, 0x1c, + 0x4e, 0x1b, 0xc6, 0xc1, 0x8f, 0x06, 0x22, 0xca, 0x04, 0x3f, 0x82, 0x34, 0xf5, 0xfa, 0xb5, 0xcc, + 0x3f, 0x58, 0x0a, 0x03, 0xfc, 0x00, 0x8a, 0x7d, 0xc7, 0xa7, 0x3b, 0xdc, 0x61, 0x5e, 0x2d, 0xdb, + 0x44, 0xad, 0x6a, 0xe7, 0xbf, 0x38, 0xf7, 0x8d, 0xe8, 0x88, 0xc4, 0x2c, 0x11, 0xbc, 0x4f, 0x07, + 0xf4, 0x73, 0x2d, 0xa7, 0x52, 0x92, 0xc0, 0x7a, 0x0a, 0x15, 0x9d, 0xb8, 0x2a, 0x05, 0x5e, 0xba, + 0xb6, 0xa6, 0x71, 0x19, 0xd7, 0xa0, 0xfc, 0xc6, 0xee, 0x51, 0x37, 0xaa, 0x1a, 0x86, 0x8c, 0x67, + 0x0f, 0xa9, 0x2e, 0x9a, 0x5c, 0xe3, 0x05, 0xc8, 0x7d, 0xb2, 0xdd, 0x31, 0x0d, 0x64, 0xd1, 0x0a, + 0x44, 0x23, 0xeb, 0x2e, 0x54, 0xb4, 0xad, 0x76, 0x1c, 0x13, 0x85, 0xdf, 0xe2, 0x8c, 0xb8, 0x07, + 0x39, 0xe5, 0x17, 0x5b, 0x90, 0x73, 0x85, 0x49, 0xa0, 0x1c, 0x74, 0xe1, 0x6c, 0xda, 0xd0, 0x3b, + 0x44, 0xff, 0xf1, 0x1a, 0xe4, 0xa9, 0xc7, 0x7d, 0x47, 0xfa, 0x13, 0xe1, 0xcf, 0xc5, 0xe1, 0xbf, + 0xf4, 0xb8, 0x3f, 0xe9, 0xce, 0x89, 0x4a, 0x0a, 0x55, 0x68, 0x1e, 0x89, 0x16, 0x16, 0x83, 0xac, + 0xa4, 0xe0, 0x57, 0x50, 0x9c, 0x09, 0x55, 0xfa, 0xfa, 0x7b, 0x6f, 0xaa, 0xfa, 0xc6, 0x14, 0x0f, + 0x64, 0x87, 0x62, 0x63, 0x7c, 0x1b, 0x32, 0xae, 0xe3, 0x51, 0x99, 0x7b, 0xb1, 0x5b, 0x38, 0x9b, + 0x36, 0x24, 0x26, 0xf2, 0x6b, 0x7d, 0x80, 0xd2, 0xb6, 0xed, 0xb8, 0xd7, 0x8a, 0x4e, 0xf5, 0x2d, + 0x95, 0xe8, 0x1b, 0xae, 0x43, 0xa1, 0x4f, 0x5d, 0x7b, 0xb2, 0xc9, 0x7c, 0xa9, 0xbb, 0x0a, 0x99, + 0x61, 0x6b, 0x02, 0x65, 0x75, 0xad, 0xae, 0x6c, 0x0b, 0x72, 0xaa, 0x63, 0x3a, 0x97, 0xab, 0x1d, + 0xd5, 0xe7, 0xf8, 0x39, 0x54, 0xfb, 0x3e, 0x1b, 0x8d, 0x68, 0x7f, 0x4b, 0x6b, 0x40, 0x15, 0x71, + 0x31, 0xa1, 0xad, 0xe4, 0x39, 0xb9, 0x44, 0xb7, 0xbe, 0x22, 0xa8, 0x5c, 0x60, 0xe0, 0xc7, 0x90, + 0xd9, 0xf5, 0xd9, 0xf0, 0x06, 0x65, 0x8c, 0x25, 0x2e, 0x2d, 0xf0, 0x0a, 0xa4, 0x38, 0x93, 0x59, + 0xdf, 0xd4, 0x2e, 0xc5, 0x99, 0x90, 0x91, 0x16, 0x49, 0x5a, 0xd6, 0x4b, 0xa3, 0xa5, 0x3b, 0x50, + 0x9c, 0x8d, 0x05, 0x2e, 0x41, 0x7e, 0xf3, 0x2d, 0xf9, 0xb8, 0x4e, 0x36, 0xe6, 0x0d, 0x5c, 0x86, + 0x42, 0x77, 0xfd, 0xc5, 0x6b, 0x89, 0x50, 0x67, 0x1d, 0x72, 0xe2, 0x69, 0xa0, 0x3e, 0x5e, 0x85, + 0x8c, 0x58, 0xe1, 0x5b, 0x71, 0xf2, 0x89, 0xc7, 0xa7, 0xbe, 0x70, 0x79, 0x5b, 0xbf, 0x25, 0x46, + 0xe7, 0x3b, 0x82, 0xbc, 0x18, 0x2a, 0x87, 0xfa, 0xf8, 0x19, 0x64, 0xe5, 0x7c, 0xe1, 0x04, 0x3d, + 0xf9, 0xd2, 0xd4, 0x17, 0xaf, 0xec, 0x47, 0xf7, 0xdc, 0x47, 0xe2, 0x69, 0x91, 0x43, 0x92, 0xb4, + 0x4e, 0x4e, 0x5c, 0xd2, 0xfa, 0xc2, 0x34, 0x59, 0x06, 0x7e, 0x02, 0x19, 0xa1, 0x82, 0x64, 0xf8, + 0x09, 0xb1, 0x25, 0xc3, 0x4f, 0x8a, 0x45, 0xb8, 0xed, 0xae, 0x1c, 0x9d, 0x98, 0xc6, 0xf1, 0x89, + 0x69, 0x9c, 0x9f, 0x98, 0xe8, 0x4b, 0x68, 0xa2, 0x6f, 0xa1, 0x89, 0x0e, 0x43, 0x13, 0x1d, 0x85, + 0x26, 0xfa, 0x19, 0x9a, 0xe8, 0x57, 0x68, 0x1a, 0xe7, 0xa1, 0x89, 0x0e, 0x4e, 0x4d, 0xe3, 0xe8, + 0xd4, 0x34, 0x8e, 0x4f, 0x4d, 0xa3, 0x97, 0x93, 0xb7, 0x3d, 0xfc, 0x13, 0x00, 0x00, 0xff, 0xff, + 0x37, 0x0c, 0xea, 0x1a, 0x0f, 0x06, 0x00, 0x00, } func (x Direction) String() string { @@ -748,6 +928,98 @@ func (this *Entry) Equal(that interface{}) bool { } return true } +func (this *TailRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailRequest) + if !ok { + that2, ok := that.(TailRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Query != that1.Query { + return false + } + if this.Regex != that1.Regex { + return false + } + if this.DelayFor != that1.DelayFor { + return false + } + return true +} +func (this *TailResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailResponse) + if !ok { + that2, ok := that.(TailResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Stream.Equal(that1.Stream) { + return false + } + if len(this.DroppedStreams) != len(that1.DroppedStreams) { + return false + } + for i := range this.DroppedStreams { + if !this.DroppedStreams[i].Equal(that1.DroppedStreams[i]) { + return false + } + } + return true +} +func (this *DroppedStream) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DroppedStream) + if !ok { + that2, ok := that.(DroppedStream) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.From.Equal(that1.From) { + return false + } + if !this.To.Equal(that1.To) { + return false + } + if this.Labels != that1.Labels { + return false + } + return true +} func (this *PushRequest) GoString() string { if this == nil { return "nil" @@ -845,6 +1117,45 @@ func (this *Entry) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *TailRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&logproto.TailRequest{") + s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") + s = append(s, "Regex: "+fmt.Sprintf("%#v", this.Regex)+",\n") + s = append(s, "DelayFor: "+fmt.Sprintf("%#v", this.DelayFor)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TailResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&logproto.TailResponse{") + if this.Stream != nil { + s = append(s, "Stream: "+fmt.Sprintf("%#v", this.Stream)+",\n") + } + if this.DroppedStreams != nil { + s = append(s, "DroppedStreams: "+fmt.Sprintf("%#v", this.DroppedStreams)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DroppedStream) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&logproto.DroppedStream{") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringLogproto(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -932,6 +1243,7 @@ var _Pusher_serviceDesc = grpc.ServiceDesc{ type QuerierClient interface { Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Querier_QueryClient, error) Label(ctx context.Context, in *LabelRequest, opts ...grpc.CallOption) (*LabelResponse, error) + Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error) } type querierClient struct { @@ -983,10 +1295,43 @@ func (c *querierClient) Label(ctx context.Context, in *LabelRequest, opts ...grp return out, nil } +func (c *querierClient) Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error) { + stream, err := c.cc.NewStream(ctx, &_Querier_serviceDesc.Streams[1], "/logproto.Querier/Tail", opts...) + if err != nil { + return nil, err + } + x := &querierTailClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Querier_TailClient interface { + Recv() (*TailResponse, error) + grpc.ClientStream +} + +type querierTailClient struct { + grpc.ClientStream +} + +func (x *querierTailClient) Recv() (*TailResponse, error) { + m := new(TailResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // QuerierServer is the server API for Querier service. type QuerierServer interface { Query(*QueryRequest, Querier_QueryServer) error Label(context.Context, *LabelRequest) (*LabelResponse, error) + Tail(*TailRequest, Querier_TailServer) error } func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) { @@ -1032,6 +1377,27 @@ func _Querier_Label_Handler(srv interface{}, ctx context.Context, dec func(inter return interceptor(ctx, in, info, handler) } +func _Querier_Tail_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(TailRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(QuerierServer).Tail(m, &querierTailServer{stream}) +} + +type Querier_TailServer interface { + Send(*TailResponse) error + grpc.ServerStream +} + +type querierTailServer struct { + grpc.ServerStream +} + +func (x *querierTailServer) Send(m *TailResponse) error { + return x.ServerStream.SendMsg(m) +} + var _Querier_serviceDesc = grpc.ServiceDesc{ ServiceName: "logproto.Querier", HandlerType: (*QuerierServer)(nil), @@ -1047,6 +1413,11 @@ var _Querier_serviceDesc = grpc.ServiceDesc{ Handler: _Querier_Query_Handler, ServerStreams: true, }, + { + StreamName: "Tail", + Handler: _Querier_Tail_Handler, + ServerStreams: true, + }, }, Metadata: "logproto.proto", } @@ -1320,15 +1691,130 @@ func (m *Entry) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int { - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return offset + 1 -} +func (m *TailRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Query) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Query))) + i += copy(dAtA[i:], m.Query) + } + if len(m.Regex) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Regex))) + i += copy(dAtA[i:], m.Regex) + } + if m.DelayFor != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(m.DelayFor)) + } + return i, nil +} + +func (m *TailResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Stream != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(m.Stream.Size())) + n4, err4 := m.Stream.MarshalTo(dAtA[i:]) + if err4 != nil { + return 0, err4 + } + i += n4 + } + if len(m.DroppedStreams) > 0 { + for _, msg := range m.DroppedStreams { + dAtA[i] = 0x12 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *DroppedStream) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DroppedStream) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.From))) + n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i:]) + if err5 != nil { + return 0, err5 + } + i += n5 + dAtA[i] = 0x12 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.To))) + n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i:]) + if err6 != nil { + return 0, err6 + } + i += n6 + if len(m.Labels) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) + i += copy(dAtA[i:], m.Labels) + } + return i, nil +} + +func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} func (m *PushRequest) Size() (n int) { if m == nil { return 0 @@ -1460,6 +1946,62 @@ func (m *Entry) Size() (n int) { return n } +func (m *TailRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Query) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + l = len(m.Regex) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + if m.DelayFor != 0 { + n += 1 + sovLogproto(uint64(m.DelayFor)) + } + return n +} + +func (m *TailResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Stream != nil { + l = m.Stream.Size() + n += 1 + l + sovLogproto(uint64(l)) + } + if len(m.DroppedStreams) > 0 { + for _, e := range m.DroppedStreams { + l = e.Size() + n += 1 + l + sovLogproto(uint64(l)) + } + } + return n +} + +func (m *DroppedStream) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.From) + n += 1 + l + sovLogproto(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.To) + n += 1 + l + sovLogproto(uint64(l)) + l = len(m.Labels) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + return n +} + func sovLogproto(x uint64) (n int) { for { n++ @@ -1575,6 +2117,46 @@ func (this *Entry) String() string { }, "") return s } +func (this *TailRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TailRequest{`, + `Query:` + fmt.Sprintf("%v", this.Query) + `,`, + `Regex:` + fmt.Sprintf("%v", this.Regex) + `,`, + `DelayFor:` + fmt.Sprintf("%v", this.DelayFor) + `,`, + `}`, + }, "") + return s +} +func (this *TailResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForDroppedStreams := "[]*DroppedStream{" + for _, f := range this.DroppedStreams { + repeatedStringForDroppedStreams += strings.Replace(f.String(), "DroppedStream", "DroppedStream", 1) + "," + } + repeatedStringForDroppedStreams += "}" + s := strings.Join([]string{`&TailResponse{`, + `Stream:` + strings.Replace(this.Stream.String(), "Stream", "Stream", 1) + `,`, + `DroppedStreams:` + repeatedStringForDroppedStreams + `,`, + `}`, + }, "") + return s +} +func (this *DroppedStream) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DroppedStream{`, + `From:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.From), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `To:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.To), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `}`, + }, "") + return s +} func valueToStringLogproto(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -2458,6 +3040,416 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } return nil } +func (m *TailRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Regex", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Regex = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DelayFor", wireType) + } + m.DelayFor = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.DelayFor |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TailResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stream", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Stream == nil { + m.Stream = &Stream{} + } + if err := m.Stream.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DroppedStreams", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DroppedStreams = append(m.DroppedStreams, &DroppedStream{}) + if err := m.DroppedStreams[len(m.DroppedStreams)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DroppedStream) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DroppedStream: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DroppedStream: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.From, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field To", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.To, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLogproto(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 95341a0f..53167645 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -26,6 +26,7 @@ type Querier interface { // Expr is a LogQL expression. type Expr interface { Eval(Querier) (iter.EntryIterator, error) + Matchers() []*labels.Matcher } type matchersExpr struct { @@ -36,12 +37,20 @@ func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) { return q.Query(e.matchers) } +func (e *matchersExpr) Matchers() []*labels.Matcher { + return e.matchers +} + type filterExpr struct { left Expr ty labels.MatchType match string } +func (e *filterExpr) Matchers() []*labels.Matcher { + return e.left.Matchers() +} + // NewFilterExpr wraps an existing Expr with a next filter expression. func NewFilterExpr(left Expr, ty labels.MatchType, match string) Expr { return &filterExpr{ diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 6241d868..b724c681 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -21,10 +21,9 @@ import ( ) const ( - defaultQueryLimit = 100 - defaulSince = 1 * time.Hour - pingPeriod = 1 * time.Second - bufferSizeForTailResponse = 10 + defaultQueryLimit = 100 + defaulSince = 1 * time.Hour + pingPeriod = 1 * time.Second ) // nolint @@ -155,16 +154,6 @@ func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) { } } -type droppedEntry struct { - Timestamp time.Time - Labels string -} - -type tailResponse struct { - Stream logproto.Stream - DroppedEntries []droppedEntry -} - // TailHandler is a http.HandlerFunc for handling tail queries. func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{ @@ -192,10 +181,8 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { // response from httpRequestToQueryRequest is a ptr, if we keep passing pointer down the call then it would stay on // heap until connection to websocket stays open tailRequest := *tailRequestPtr - responseChan := make(chan tailResponse, bufferSizeForTailResponse) - closeErrChan := make(chan error) - tailer, err := q.Tail(r.Context(), &tailRequest, responseChan, closeErrChan) + tailer, err := q.Tail(r.Context(), &tailRequest) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { level.Error(util.Logger).Log("Error connecting to ingesters for tailing", fmt.Sprintf("%v", err)) @@ -211,22 +198,22 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() - var response tailResponse + var response *TailResponse + responseChan := tailer.getResponseChan() + closeErrChan := tailer.getCloseErrorChan() for { select { case response = <-responseChan: - err := conn.WriteJSON(response) + err := conn.WriteJSON(*response) if err != nil { level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err)) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err)) } - if err := tailer.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) - } return } + case err := <-closeErrChan: level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err)) if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index bf137691..cd8c7c82 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -62,19 +62,14 @@ func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, e return nil, err } - return q.forGivenIngesters(replicationSet.Ingesters, f) + return q.forGivenIngesters(replicationSet, f) } // forGivenIngesters runs f, in parallel, for given ingesters // TODO taken from Cortex, see if we can refactor out an usable interface. -func (q *Querier) forGivenIngesters(ingesters []ring.IngesterDesc, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { - replicationSet, err := q.ring.GetAll() - if err != nil { - return nil, err - } - +func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { resps, errs := make(chan responseFromIngesters), make(chan error) - for _, ingester := range ingesters { + for _, ingester := range replicationSet.Ingesters { go func(ingester ring.IngesterDesc) { client, err := q.pool.GetClientFor(ingester.Addr) if err != nil { @@ -233,7 +228,7 @@ func mergePair(s1, s2 []string) []string { } // Tail keeps getting matching logs from all ingesters for given query -func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest, responseChan chan<- tailResponse, closeErrChan chan<- error) (*Tailer, error) { +func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) { clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { return client.Tail(ctx, req) }) @@ -246,7 +241,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest, responseC tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient) } - return newTailer(time.Duration(req.DelayFor)*time.Second, tailClients, responseChan, closeErrChan, func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) { + return newTailer(time.Duration(req.DelayFor)*time.Second, tailClients, func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) { return q.queryDroppedStreams(ctx, req, from, to, labels) }, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { return q.tailDisconnectedIngesteres(ctx, req, connectedIngestersAddr) @@ -275,7 +270,7 @@ func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailReq Limit: 10000, } - clients, err := q.forGivenIngesters(replicationSet.Ingesters, func(client logproto.QuerierClient) (interface{}, error) { + clients, err := q.forGivenIngesters(replicationSet, func(client logproto.QuerierClient) (interface{}, error) { return client.Query(ctx, &query) }) if err != nil { @@ -292,7 +287,7 @@ func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailReq return nil, err } - iterators := append(chunkStoreIterators, ingesterIterators...) + iterators := append(ingesterIterators, chunkStoreIterators) return iter.NewHeapIterator(iterators, query.Direction), nil } @@ -317,7 +312,7 @@ func (q *Querier) tailDisconnectedIngesteres(ctx context.Context, req *logproto. } } - clients, err := q.forGivenIngesters(disconnectedIngesters, func(client logproto.QuerierClient) (interface{}, error) { + clients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: disconnectedIngesters}, func(client logproto.QuerierClient) (interface{}, error) { return client.Tail(ctx, req) }) if err != nil { diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 9c42d626..71323b2b 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -1,7 +1,6 @@ package querier import ( - "container/heap" "fmt" "sync" "time" @@ -13,13 +12,28 @@ import ( "github.com/pkg/errors" ) -// if we are not seeing any response from ingester, how long do we want to wait by going into sleep -const nextEntryWait = time.Second / 2 +const ( + // if we are not seeing any response from ingester, how long do we want to wait by going into sleep + nextEntryWait = time.Second / 2 -// keep checking connections with ingesters in duration -const checkConnectionsWithIngestersPeriod = time.Second * 5 + // keep checking connections with ingesters in duration + checkConnectionsWithIngestersPeriod = time.Second * 5 -// dropped streams are collected into a heap to quickly find dropped stream which has oldest timestamp + bufferSizeForTailResponse = 10 +) + +type droppedEntry struct { + Timestamp time.Time + Labels string +} + +// TailResponse holds response sent by tailer +type TailResponse struct { + Stream []logproto.Stream + DroppedEntries []droppedEntry +} + +/*// dropped streams are collected into a heap to quickly find dropped stream which has oldest timestamp type droppedStreamsIterator []logproto.DroppedStream func (h droppedStreamsIterator) Len() int { return len(h) } @@ -47,17 +61,17 @@ func (h droppedStreamsIterator) Less(i, j int) bool { return t1.Before(t2) } return h[i].Labels < h[j].Labels -} +}*/ // Tailer manages complete lifecycle of a tail request type Tailer struct { // openStreamIterator is for streams already open which can be complete streams returned by ingester or // dropped streams queried from ingester and store - openStreamIterator iter.HeapIterator - droppedStreamsIterator interface { // for holding dropped stream metadata + openStreamIterator iter.HeapIterator + /*droppedStreamsIterator interface { // for holding dropped stream metadata heap.Interface Peek() time.Time - } + }*/ streamMtx sync.Mutex // for synchronizing access to openStreamIterator and droppedStreamsIterator currEntry logproto.Entry @@ -71,9 +85,10 @@ type Tailer struct { stopped bool blocked bool + blockedMtx sync.RWMutex delayFor time.Duration - responseChan chan<- tailResponse - closeErrChan chan<- error + responseChan chan *TailResponse + closeErrChan chan error // when tail client is slow, drop entry and store its details in droppedEntries to notify client droppedEntries []droppedEntry @@ -91,6 +106,8 @@ func (t *Tailer) loop() { ticker := time.NewTicker(checkConnectionsWithIngestersPeriod) defer ticker.Stop() + tailResponse := new(TailResponse) + for { if t.stopped { break @@ -106,37 +123,44 @@ func (t *Tailer) loop() { } if !t.next() { - if len(t.querierTailClients) == 0 { - // All the connections to ingesters are dropped, try reconnecting or return error - if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) - } else { - continue - } - if err := t.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + if len(tailResponse.Stream) == 0 { + if len(t.querierTailClients) == 0 { + // All the connections to ingesters are dropped, try reconnecting or return error + if err := t.checkIngesterConnections(); err != nil { + level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err)) + } else { + continue + } + if err := t.close(); err != nil { + level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + } + t.closeErrChan <- errors.New("All ingesters closed the connection") + break } - t.closeErrChan <- errors.New("All ingesters closed the connection") - break + time.Sleep(nextEntryWait) + continue + } + } else { + // If channel is blocked already, drop current entry directly to save the effort + if t.isBlocked() { + t.dropEntry(t.currEntry.Timestamp, t.currLabels, nil) + continue } - time.Sleep(nextEntryWait) - continue - } - // If channel is blocked already, drop current entry directly to save the effort - if t.blocked { - t.dropEntry(t.currEntry.Timestamp, t.currLabels) - continue + tailResponse.Stream = append(tailResponse.Stream, logproto.Stream{Labels: t.currLabels, Entries: []logproto.Entry{t.currEntry}}) + if len(tailResponse.Stream) != 100 { + continue + } + tailResponse.DroppedEntries = t.popDroppedEntries() } - response := tailResponse{Stream: logproto.Stream{Labels: t.currLabels, Entries: []logproto.Entry{t.currEntry}}, DroppedEntries: t.popDroppedEntries()} + //response := []tailResponse{{Stream: logproto.Stream{Labels: t.currLabels, Entries: responses[t.currLabels]}, DroppedEntries: t.popDroppedEntries()}} select { - case t.responseChan <- response: - t.blocked = false + case t.responseChan <- tailResponse: default: - t.blocked = true - t.dropEntry(t.currEntry.Timestamp, t.currLabels) + t.dropEntry(t.currEntry.Timestamp, t.currLabels, tailResponse.DroppedEntries) } + tailResponse = new(TailResponse) } } @@ -197,11 +221,11 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) { defer t.streamMtx.Unlock() t.openStreamIterator.Push(iter.NewStreamIterator(resp.Stream)) - if resp.DroppedStreams != nil { + /*if resp.DroppedStreams != nil { for idx := range resp.DroppedStreams { heap.Push(t.droppedStreamsIterator, *resp.DroppedStreams[idx]) } - } + }*/ } // finds oldest entry by peeking at open stream iterator and dropped stream iterator. @@ -212,7 +236,10 @@ func (t *Tailer) next() bool { t.streamMtx.Lock() defer t.streamMtx.Unlock() - // if we don't have any entries or any of the entries are not older than now()-delay then return false + if t.openStreamIterator.Len() == 0 || !time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor)) || !t.openStreamIterator.Next() { + return false + } + /*// if we don't have any entries or any of the entries are not older than now()-delay then return false if !((t.openStreamIterator.Len() != 0 && time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor))) || (t.droppedStreamsIterator.Len() != 0 && time.Now().After(t.droppedStreamsIterator.Peek().Add(t.delayFor)))) { return false } @@ -235,7 +262,7 @@ func (t *Tailer) next() bool { if !t.openStreamIterator.Next() { return false - } + }*/ t.currEntry = t.openStreamIterator.Entry() t.currLabels = t.openStreamIterator.Labels() @@ -250,11 +277,26 @@ func (t *Tailer) close() error { return t.openStreamIterator.Close() } -func (t *Tailer) dropEntry(timestamp time.Time, labels string) { +func (t *Tailer) dropEntry(timestamp time.Time, labels string, alreadyDroppedEntries []droppedEntry) { + t.blockedMtx.Lock() + defer t.blockedMtx.Unlock() + + t.droppedEntries = append(t.droppedEntries, alreadyDroppedEntries...) t.droppedEntries = append(t.droppedEntries, droppedEntry{timestamp, labels}) } +func (t *Tailer) isBlocked() bool { + t.blockedMtx.RLock() + defer t.blockedMtx.RUnlock() + + return t.blocked +} + func (t *Tailer) popDroppedEntries() []droppedEntry { + t.blockedMtx.Lock() + defer t.blockedMtx.Unlock() + + t.blocked = false if len(t.droppedEntries) == 0 { return nil } @@ -264,18 +306,25 @@ func (t *Tailer) popDroppedEntries() []droppedEntry { return droppedEntries } +func (t *Tailer) getResponseChan() <-chan *TailResponse { + return t.responseChan +} + +func (t *Tailer) getCloseErrorChan() <-chan error { + return t.closeErrChan +} + func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, - responseChan chan<- tailResponse, closeErrChan chan<- error, queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer { t := Tailer{ - openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), - droppedStreamsIterator: &droppedStreamsIterator{}, + openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), + //droppedStreamsIterator: &droppedStreamsIterator{}, querierTailClients: querierTailClients, queryDroppedStreams: queryDroppedStreams, delayFor: delayFor, - responseChan: responseChan, - closeErrChan: closeErrChan, + responseChan: make(chan *TailResponse, bufferSizeForTailResponse), + closeErrChan: make(chan error), tailDisconnectedIngesters: tailDisconnectedIngesters, } -- GitLab