diff --git a/Gopkg.lock b/Gopkg.lock index b28ae4da1741b9f3e4d257c15216cb5896dadb10..8c60d724f65ca920918d793d07653f6c6b042310 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1575,6 +1575,7 @@ "github.com/coreos/go-systemd/sdjournal", "github.com/cortexproject/cortex/pkg/chunk", "github.com/cortexproject/cortex/pkg/chunk/encoding", + "github.com/cortexproject/cortex/pkg/chunk/local", "github.com/cortexproject/cortex/pkg/chunk/storage", "github.com/cortexproject/cortex/pkg/ingester/client", "github.com/cortexproject/cortex/pkg/ingester/index", @@ -1637,6 +1638,7 @@ "golang.org/x/net/context", "google.golang.org/grpc", "google.golang.org/grpc/health/grpc_health_v1", + "google.golang.org/grpc/metadata", "gopkg.in/alecthomas/kingpin.v2", "gopkg.in/fsnotify.v1", "gopkg.in/yaml.v2", diff --git a/Makefile b/Makefile index 9fa45ebb43ab5950fb3b3a47af57fd5642173307..41ab056eabcfb9353bc215fbfa506c2f47754bf9 100644 --- a/Makefile +++ b/Makefile @@ -312,3 +312,7 @@ push-plugin: build-plugin enable-plugin: docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG) + +benchmark-store: + go run ./pkg/storage/hack/main.go + go test ./pkg/storage/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 17e3511305ee5530e02df48ce54647d1e19c84a3..dac5d2841a6528c901f5c055ccf4e277eec4ee43 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -10,7 +10,8 @@ ingester: kvstore: store: inmemory replication_factor: 1 - chunk_idle_period: 15m + chunk_idle_period: 5m + chunk_retain_period: 30s schema_config: configs: diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 8393c771655ab0d369df377dbfb63c37b959df22..89a3b4128feefb6d025e5b612419d841ba8c52af 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) const ( @@ -51,7 +52,7 @@ func (c *dumbChunk) Size() int { // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) { +func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index e10defe37ece215d8150e71fd827f03acee1b0e1..ef3801d1dc2226e368df7435c2c43f3eb4e30930 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -17,7 +17,7 @@ func init() { }) } -// Facade for compatibility with cortex chunk type, so we can use it's chunk store. +// Facade for compatibility with cortex chunk type, so we can use its chunk store. type Facade struct { c Chunk encoding.Chunk diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index 634728a6b7de5ee57a610440e3feba9c8aba758d..ff5bc5d54ce22d7e717b63c8be987046fe2226cc 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -3,18 +3,15 @@ package chunkenc import ( "bufio" "bytes" - "compress/gzip" "encoding/binary" "hash" "hash/crc32" "io" - "math" "time" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/iter" - + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/pkg/errors" ) @@ -53,8 +50,7 @@ type MemChunk struct { head *headBlock encoding Encoding - cw func(w io.Writer) CompressionWriter - cr func(r io.Reader) (CompressionReader, error) + cPool CompressionPool } type block struct { @@ -96,10 +92,10 @@ func (hb *headBlock) append(ts int64, line string) error { return nil } -func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) { +func (hb *headBlock) serialise(pool CompressionPool) ([]byte, error) { buf := &bytes.Buffer{} encBuf := make([]byte, binary.MaxVarintLen64) - compressedWriter := cw(buf) + compressedWriter := pool.GetWriter(buf) for _, logEntry := range hb.entries { n := binary.PutVarint(encBuf, logEntry.t) _, err := compressedWriter.Write(encBuf[:n]) @@ -120,7 +116,7 @@ func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, if err := compressedWriter.Close(); err != nil { return nil, errors.Wrap(err, "flushing pending compress buffer") } - + pool.PutWriter(compressedWriter) return buf.Bytes(), nil } @@ -136,18 +132,14 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk { blockSize: blockSize, // The blockSize in bytes. blocks: []block{}, - head: &headBlock{ - mint: math.MaxInt64, - maxt: math.MinInt64, - }, + head: &headBlock{}, encoding: enc, } switch enc { case EncGZIP: - c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) } - c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) } + c.cPool = &Gzip default: panic("unknown encoding") } @@ -163,8 +155,8 @@ func NewMemChunk(enc Encoding) *MemChunk { // NewByteChunk returns a MemChunk on the passed bytes. func NewByteChunk(b []byte) (*MemChunk, error) { bc := &MemChunk{ - cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }, - head: &headBlock{}, // Dummy, empty headblock. + cPool: &Gzip, + head: &headBlock{}, // Dummy, empty headblock. } db := decbuf{b: b} @@ -192,6 +184,7 @@ func NewByteChunk(b []byte) (*MemChunk, error) { // Read the number of blocks. num := db.uvarint() + bc.blocks = make([]block, 0, num) for i := 0; i < num; i++ { blk := block{} @@ -343,7 +336,7 @@ func (c *MemChunk) cut() error { return nil } - b, err := c.head.serialise(c.cw) + b, err := c.head.serialise(c.cPool) if err != nil { return err } @@ -384,22 +377,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { } // Iterator implements Chunk. -func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) { +func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() - its := make([]iter.EntryIterator, 0, len(c.blocks)) + its := make([]iter.EntryIterator, 0, len(c.blocks)+1) for _, b := range c.blocks { if maxt > b.mint && b.maxt > mint { - it, err := b.iterator(c.cr) - if err != nil { - return nil, err - } - - its = append(its, it) + its = append(its, b.iterator(c.cPool, filter)) } } - its = append(its, c.head.iterator(mint, maxt)) + if !c.head.isEmpty() { + its = append(its, c.head.iterator(mint, maxt, filter)) + } iterForward := iter.NewTimeRangedIterator( iter.NewNonOverlappingIterator(its, ""), @@ -414,21 +404,14 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction return iter.NewEntryIteratorBackward(iterForward) } -func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) { +func (b block) iterator(pool CompressionPool, filter logql.Filter) iter.EntryIterator { if len(b.b) == 0 { - return emptyIterator, nil - } - - r, err := cr(bytes.NewBuffer(b.b)) - if err != nil { - return nil, err + return emptyIterator } - - s := bufio.NewReader(r) - return newBufferedIterator(s), nil + return newBufferedIterator(pool, b.b, filter) } -func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator { +func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return emptyIterator } @@ -438,8 +421,16 @@ func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator { // but the tradeoff is that queries to near-realtime data would be much lower than // cutting of blocks. - entries := make([]entry, len(hb.entries)) - copy(entries, hb.entries) + entries := make([]entry, 0, len(hb.entries)) + for _, e := range hb.entries { + if filter == nil || filter([]byte(e.s)) { + entries = append(entries, e) + } + } + + if len(entries) == 0 { + return emptyIterator + } return &listIterator{ entries: entries, @@ -477,73 +468,107 @@ func (li *listIterator) Close() error { return nil } func (li *listIterator) Labels() string { return "" } type bufferedIterator struct { - s *bufio.Reader + s *bufio.Reader + reader CompressionReader + pool CompressionPool - curT int64 - curLog string + cur logproto.Entry err error - buf []byte // The buffer a single entry. - decBuf []byte // The buffer for decoding the lengths. + buf *bytes.Buffer // The buffer for a single entry. + decBuf []byte // The buffer for decoding the lengths. + + closed bool + + filter logql.Filter } -func newBufferedIterator(s *bufio.Reader) *bufferedIterator { +func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *bufferedIterator { + r := pool.GetReader(bytes.NewBuffer(b)) return &bufferedIterator{ - s: s, - buf: make([]byte, 1024), + s: BufReaderPool.Get(r), + reader: r, + pool: pool, + filter: filter, + buf: BytesBufferPool.Get(), decBuf: make([]byte, binary.MaxVarintLen64), } } func (si *bufferedIterator) Next() bool { + for { + ts, line, ok := si.moveNext() + if !ok { + si.Close() + return false + } + if si.filter != nil && !si.filter(line) { + continue + } + si.cur.Line = string(line) + si.cur.Timestamp = time.Unix(0, ts) + return true + } +} + +// moveNext moves the buffer to the next entry +func (si *bufferedIterator) moveNext() (int64, []byte, bool) { ts, err := binary.ReadVarint(si.s) if err != nil { if err != io.EOF { si.err = err } - return false + return 0, nil, false } l, err := binary.ReadUvarint(si.s) if err != nil { if err != io.EOF { si.err = err - - return false + return 0, nil, false } } - for len(si.buf) < int(l) { - si.buf = append(si.buf, make([]byte, 1024)...) + if si.buf.Cap() < int(l) { + si.buf.Grow(int(l) - si.buf.Cap()) } - n, err := si.s.Read(si.buf[:l]) + n, err := si.s.Read(si.buf.Bytes()[:l]) if err != nil && err != io.EOF { si.err = err - return false + return 0, nil, false } - if n < int(l) { - _, err = si.s.Read(si.buf[n:l]) + for n < int(l) { + r, err := si.s.Read(si.buf.Bytes()[n:l]) if err != nil { si.err = err - return false + return 0, nil, false } + n += r } - - si.curT = ts - si.curLog = string(si.buf[:l]) - - return true + return ts, si.buf.Bytes()[:l], true } func (si *bufferedIterator) Entry() logproto.Entry { - return logproto.Entry{ - Timestamp: time.Unix(0, si.curT), - Line: si.curLog, - } + return si.cur +} + +func (si *bufferedIterator) Error() error { return si.err } + +func (si *bufferedIterator) Close() error { + if !si.closed { + si.closed = true + si.pool.PutReader(si.reader) + BufReaderPool.Put(si.s) + BytesBufferPool.Put(si.buf) + si.s = nil + si.buf = nil + si.decBuf = nil + si.reader = nil + return si.err + } + return si.err } -func (si *bufferedIterator) Error() error { return si.err } -func (si *bufferedIterator) Close() error { return si.err } func (si *bufferedIterator) Labels() string { return "" } diff --git a/pkg/chunkenc/gzip_test.go b/pkg/chunkenc/gzip_test.go index c210889352fa36aef911fc52160a53da242f8dea..5e1363e08b115ee92df727d6bd782c1799e972fb 100644 --- a/pkg/chunkenc/gzip_test.go +++ b/pkg/chunkenc/gzip_test.go @@ -5,11 +5,12 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" + "sync" "testing" "time" "github.com/grafana/loki/pkg/logproto" - "github.com/stretchr/testify/require" ) @@ -75,7 +76,7 @@ func TestGZIPBlock(t *testing.T) { } } - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) require.NoError(t, err) idx := 0 @@ -90,7 +91,7 @@ func TestGZIPBlock(t *testing.T) { require.Equal(t, len(cases), idx) t.Run("bounded-iteration", func(t *testing.T) { - it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD) + it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil) require.NoError(t, err) idx := 2 @@ -132,7 +133,7 @@ func TestGZIPCompression(t *testing.T) { require.NoError(t, err) fmt.Println(float64(len(b))/(1024*1024), float64(len(b2))/(1024*1024), float64(len(b2))/float64(len(chk.blocks))) - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) require.NoError(t, err) for i, l := range lines { @@ -162,7 +163,7 @@ func TestGZIPSerialisation(t *testing.T) { bc, err := NewByteChunk(byt) require.NoError(t, err) - it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil) require.NoError(t, err) for i := 0; i < numSamples; i++ { require.True(t, it.Next()) @@ -203,7 +204,7 @@ func TestGZIPChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD) + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil) require.NoError(t, err) i = 0 for it.Next() { @@ -215,6 +216,101 @@ func TestGZIPChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) } +var result []Chunk + +func BenchmarkWriteGZIP(b *testing.B) { + chunks := []Chunk{} + + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: RandString(512), + } + i := int64(0) + + for n := 0; n < b.N; n++ { + c := NewMemChunk(EncGZIP) + // adds until full so we trigger cut which serialize using gzip + for c.SpaceFor(entry) { + _ = c.Append(entry) + entry.Timestamp = time.Unix(0, i) + i++ + } + chunks = append(chunks, c) + } + result = chunks +} + +func BenchmarkReadGZIP(b *testing.B) { + chunks := []Chunk{} + i := int64(0) + for n := 0; n < 50; n++ { + entry := randSizeEntry(0) + c := NewMemChunk(EncGZIP) + // adds until full so we trigger cut which serialize using gzip + for c.SpaceFor(entry) { + _ = c.Append(entry) + i++ + entry = randSizeEntry(i) + } + c.Close() + chunks = append(chunks, c) + } + entries := []logproto.Entry{} + b.ResetTimer() + for n := 0; n < b.N; n++ { + var wg sync.WaitGroup + for _, c := range chunks { + wg.Add(1) + go func(c Chunk) { + iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil) + if err != nil { + panic(err) + } + for iterator.Next() { + entries = append(entries, iterator.Entry()) + } + iterator.Close() + wg.Done() + }(c) + } + wg.Wait() + } + +} + +func randSizeEntry(ts int64) *logproto.Entry { + var line string + switch ts % 10 { + case 0: + line = RandString(27000) + case 1: + line = RandString(10000) + case 2, 3, 4, 5: + line = RandString(2048) + default: + line = RandString(4096) + } + return &logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: line, + } +} + +const charset = "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +func RandStringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset)-1)] + } + return string(b) +} + +func RandString(length int) string { + return RandStringWithCharset(length, charset) +} + func logprotoEntry(ts int64, line string) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, ts), diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index b01358761ad40aa99014e4d7ae1403f22cecbe1e..08ee8cf3034c212264de9d84b0ebc30ef3bddfd1 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) // Errors returned by the chunk interface. @@ -46,7 +47,7 @@ type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error - Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) + Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) Size() int Bytes() ([]byte, error) } diff --git a/pkg/chunkenc/lazy_chunk.go b/pkg/chunkenc/lazy_chunk.go index f23b404925a9bddcec3b5544ccf5428415d5bfa9..cf710bfe5cffb4fb92e0eefb444340e9f2dbb6e8 100644 --- a/pkg/chunkenc/lazy_chunk.go +++ b/pkg/chunkenc/lazy_chunk.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) // LazyChunk loads the chunk when it is accessed. @@ -20,21 +21,20 @@ func (c *LazyChunk) getChunk(ctx context.Context) (Chunk, error) { if err != nil { return nil, err } - - c.Chunk = chunks[0] return chunks[0].Data.(*Facade).LokiChunk(), nil } // Iterator returns an entry iterator. -func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) { +func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { // If the chunk is already loaded, then use that. if c.Chunk.Data != nil { lokiChunk := c.Chunk.Data.(*Facade).LokiChunk() - return lokiChunk.Iterator(from, through, direction) + return lokiChunk.Iterator(from, through, direction, filter) } return &lazyIterator{ - chunk: c, + chunk: c, + filter: filter, from: from, through: through, @@ -46,12 +46,15 @@ func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direct type lazyIterator struct { iter.EntryIterator - chunk LazyChunk + chunk *LazyChunk err error from, through time.Time direction logproto.Direction context context.Context + filter logql.Filter + + closed bool } func (it *lazyIterator) Next() bool { @@ -59,8 +62,16 @@ func (it *lazyIterator) Next() bool { return false } + if it.closed { + return false + } + if it.EntryIterator != nil { - return it.EntryIterator.Next() + next := it.EntryIterator.Next() + if !next { + it.Close() + } + return next } chk, err := it.chunk.getChunk(it.context) @@ -68,9 +79,7 @@ func (it *lazyIterator) Next() bool { it.err = err return false } - - it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction) - + it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction, it.filter) return it.Next() } @@ -82,6 +91,18 @@ func (it *lazyIterator) Error() error { if it.err != nil { return it.err } + if it.EntryIterator != nil { + return it.EntryIterator.Error() + } + return nil +} - return it.EntryIterator.Error() +func (it *lazyIterator) Close() error { + if it.EntryIterator != nil { + it.closed = true + err := it.EntryIterator.Close() + it.EntryIterator = nil + return err + } + return nil } diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go new file mode 100644 index 0000000000000000000000000000000000000000..a843159de25f7a20314806fe28b8a4833b2dbf44 --- /dev/null +++ b/pkg/chunkenc/pool.go @@ -0,0 +1,114 @@ +package chunkenc + +import ( + "bufio" + "bytes" + "compress/gzip" + + "io" + "sync" +) + +// CompressionPool is a pool of CompressionWriter and CompressionReader +// This is used by every chunk to avoid unnecessary allocations. +type CompressionPool interface { + GetWriter(io.Writer) CompressionWriter + PutWriter(CompressionWriter) + GetReader(io.Reader) CompressionReader + PutReader(CompressionReader) +} + +var ( + // Gzip is the gun zip compression pool + Gzip GzipPool + // BufReaderPool is bufio.Reader pool + BufReaderPool = &BufioReaderPool{ + pool: sync.Pool{ + New: func() interface{} { return bufio.NewReader(nil) }, + }, + } + // BytesBufferPool is a bytes buffer used for lines decompressed. + BytesBufferPool = newBufferPoolWithSize(4096) +) + +// GzipPool is a gun zip compression pool +type GzipPool struct { + readers sync.Pool + writers sync.Pool +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *GzipPool) GetReader(src io.Reader) (reader CompressionReader) { + if r := pool.readers.Get(); r != nil { + reader = r.(CompressionReader) + err := reader.Reset(src) + if err != nil { + panic(err) + } + } else { + var err error + reader, err = gzip.NewReader(src) + if err != nil { + panic(err) + } + } + return reader +} + +// PutReader places back in the pool a CompressionReader +func (pool *GzipPool) PutReader(reader CompressionReader) { + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *GzipPool) GetWriter(dst io.Writer) (writer CompressionWriter) { + if w := pool.writers.Get(); w != nil { + writer = w.(CompressionWriter) + writer.Reset(dst) + } else { + writer = gzip.NewWriter(dst) + } + return writer +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *GzipPool) PutWriter(writer CompressionWriter) { + pool.writers.Put(writer) +} + +// BufioReaderPool is a bufio reader that uses sync.Pool. +type BufioReaderPool struct { + pool sync.Pool +} + +// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool. +func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader { + buf := bufPool.pool.Get().(*bufio.Reader) + buf.Reset(r) + return buf +} + +// Put puts the bufio.Reader back into the pool. +func (bufPool *BufioReaderPool) Put(b *bufio.Reader) { + bufPool.pool.Put(b) +} + +type bufferPool struct { + pool sync.Pool +} + +func newBufferPoolWithSize(size int) *bufferPool { + return &bufferPool{ + pool: sync.Pool{ + New: func() interface{} { return bytes.NewBuffer(make([]byte, size)) }, + }, + } +} + +func (bp *bufferPool) Get() *bytes.Buffer { + return bp.pool.Get().(*bytes.Buffer) +} + +func (bp *bufferPool) Put(b *bytes.Buffer) { + bp.pool.Put(b) +} diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 4feb7fffe778a52df8c53615ceed406d69f34dc4..e7ab04b6ed9d926feb173d396cce4fcaafe71da9 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -60,7 +60,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) + iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) _ = iter.Close() @@ -69,7 +69,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) + iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) _ = iter.Close() diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 601134807c063527f56e7986f46374b5477a2b4d..aab3f96f3862211dd02f7aa646104e332836ca77 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -102,6 +103,14 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } +func (s *testStore) IsLocal() bool { + return false +} + +func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) { + return nil, nil +} + func (s *testStore) Stop() {} func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[string][]*logproto.Stream) { @@ -174,7 +183,7 @@ func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[strin } func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream { - it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD) + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil) require.NoError(t, err) stream := &logproto.Stream{ diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 4de1930f6cd8ba231ffdc357da20100f23865c46..be38c6400aaf5492ca85f88d5acb698c12faafa4 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -112,8 +112,8 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie expr = logql.NewFilterExpr(expr, labels.MatchRegexp, req.Regex) } - querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) { - iters, err := i.lookupStreams(req, matchers) + querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) { + iters, err := i.lookupStreams(req, matchers, filter) if err != nil { return nil, err } @@ -150,7 +150,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro }, nil } -func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher) ([]iter.EntryIterator, error) { +func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) { i.streamsMtx.RLock() defer i.streamsMtx.RUnlock() @@ -170,7 +170,7 @@ outer: continue outer } } - iter, err := stream.Iterator(req.Start, req.End, req.Direction) + iter, err := stream.Iterator(req.Start, req.End, req.Direction, filter) if err != nil { return nil, err } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index ccd7b6e6c08367125ed4222ce8a354029a5b6ebd..f3aab3d79760ba9e0636918769d8941cbdf0cd5f 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) var ( @@ -142,10 +143,10 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { } // Returns an iterator. -func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) { +func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) { iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { - itr, err := c.chunk.Iterator(from, through, direction) + itr, err := c.chunk.Iterator(from, through, direction, filter) if err != nil { return nil, err } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index a61b100b16c5b55b481b15ce0a62a863eed93868..3febb271eada9592d5b53d694c866fd9365c17dc 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -40,7 +40,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) + iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -50,7 +50,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) + iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index c37ced48f14e497df858565aef042465a7e8e2e4..c332b2765b8a9f88cb35a0db2f2689a18c9f80bd 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -129,21 +129,22 @@ func (t *tailer) send(stream logproto.Stream) { } func (t *tailer) filterEntriesInStream(stream *logproto.Stream) error { - querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) { - return iter.NewStreamIterator(stream), nil + querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) { + var filteredEntries []logproto.Entry + for _, e := range stream.Entries { + if filter == nil || filter([]byte(e.Line)) { + filteredEntries = append(filteredEntries, e) + } + } + stream.Entries = filteredEntries + return nil, nil }) - itr, err := t.expr.Eval(querier) + _, err := t.expr.Eval(querier) if err != nil { return err } - filteredEntries := new([]logproto.Entry) - for itr.Next() { - *filteredEntries = append(*filteredEntries, itr.Entry()) - } - - stream.Entries = *filteredEntries return nil } diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index fe18b7c0cae49f9f6441c6a93298bce22a32478b..dfe3287aa16c0ade1e44c5b67c3d241db1c2d914 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -113,6 +113,10 @@ type heapIterator struct { heap.Interface Peek() EntryIterator } + is []EntryIterator + prenext bool + + tuples tuples currEntry logproto.Entry currLabels string errs []error @@ -121,7 +125,7 @@ type heapIterator struct { // NewHeapIterator returns a new iterator which uses a heap to merge together // entries for multiple interators. func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapIterator { - result := &heapIterator{} + result := &heapIterator{is: is} switch direction { case logproto.BACKWARD: result.heap = &iteratorMaxHeap{} @@ -131,11 +135,7 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapItera panic("bad direction") } - // pre-next each iterator, drop empty. - for _, i := range is { - result.requeue(i, false) - } - + result.tuples = make([]tuple, 0, len(is)) return result } @@ -160,7 +160,21 @@ type tuple struct { EntryIterator } +type tuples []tuple + +func (t tuples) Len() int { return len(t) } +func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line } + func (i *heapIterator) Next() bool { + if !i.prenext { + i.prenext = true + // pre-next each iterator, drop empty. + for _, it := range i.is { + i.requeue(it, false) + } + i.is = nil + } if i.heap.Len() == 0 { return false } @@ -170,16 +184,15 @@ func (i *heapIterator) Next() bool { // heap with the same timestamp, and pop the ones whose common value // occurs most often. - tuples := make([]tuple, 0, i.heap.Len()) for i.heap.Len() > 0 { next := i.heap.Peek() entry := next.Entry() - if len(tuples) > 0 && (tuples[0].Labels() != next.Labels() || !tuples[0].Timestamp.Equal(entry.Timestamp)) { + if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) { break } heap.Pop(i.heap) - tuples = append(tuples, tuple{ + i.tuples = append(i.tuples, tuple{ Entry: entry, EntryIterator: next, }) @@ -187,22 +200,20 @@ func (i *heapIterator) Next() bool { // Find in entry which occurs most often which, due to quorum based // replication, is guaranteed to be the correct next entry. - t := mostCommon(tuples) + t := mostCommon(i.tuples) i.currEntry = t.Entry i.currLabels = t.Labels() // Requeue the iterators, advancing them if they were consumed. - for j := range tuples { - i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line) + for j := range i.tuples { + i.requeue(i.tuples[j].EntryIterator, i.tuples[j].Line != i.currEntry.Line) } - + i.tuples = i.tuples[:0] return true } -func mostCommon(tuples []tuple) tuple { - sort.Slice(tuples, func(i, j int) bool { - return tuples[i].Line < tuples[j].Line - }) +func mostCommon(tuples tuples) tuple { + sort.Sort(tuples) result := tuples[0] count, max := 0, 0 for i := 0; i < len(tuples)-1; i++ { @@ -247,6 +258,7 @@ func (i *heapIterator) Close() error { return err } } + i.tuples = nil return nil } @@ -314,28 +326,6 @@ func (i *queryClientIterator) Close() error { return i.client.CloseSend() } -type filter struct { - EntryIterator - f func(string) bool -} - -// NewFilter builds a filtering iterator. -func NewFilter(f func(string) bool, i EntryIterator) EntryIterator { - return &filter{ - f: f, - EntryIterator: i, - } -} - -func (i *filter) Next() bool { - for i.EntryIterator.Next() { - if i.f(i.Entry().Line) { - return true - } - } - return false -} - type nonOverlappingIterator struct { labels string i int @@ -353,12 +343,17 @@ func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIt func (i *nonOverlappingIterator) Next() bool { for i.curr == nil || !i.curr.Next() { - if i.i >= len(i.iterators) { + if len(i.iterators) == 0 { + if i.curr != nil { + i.curr.Close() + } return false } - - i.curr = i.iterators[i.i] + if i.curr != nil { + i.curr.Close() + } i.i++ + i.curr, i.iterators = i.iterators[0], i.iterators[1:] } return true @@ -377,10 +372,17 @@ func (i *nonOverlappingIterator) Labels() string { } func (i *nonOverlappingIterator) Error() error { - return i.curr.Error() + if i.curr != nil { + return i.curr.Error() + } + return nil } func (i *nonOverlappingIterator) Close() error { + for _, iter := range i.iterators { + iter.Close() + } + i.iterators = nil return nil } @@ -400,7 +402,10 @@ func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator func (i *timeRangedIterator) Next() bool { ok := i.EntryIterator.Next() - + if !ok { + i.EntryIterator.Close() + return ok + } ts := i.EntryIterator.Entry().Timestamp for ok && i.mint.After(ts) { ok = i.EntryIterator.Next() @@ -410,34 +415,43 @@ func (i *timeRangedIterator) Next() bool { if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. ok = false } - + if !ok { + i.EntryIterator.Close() + } return ok } type entryIteratorBackward struct { - cur logproto.Entry - entries []logproto.Entry + forwardIter EntryIterator + cur logproto.Entry + entries []logproto.Entry + loaded bool } // NewEntryIteratorBackward returns an iterator which loads all the entries // of an existing iterator, and then iterates over them backward. func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) { - entries := make([]logproto.Entry, 0, 128) - for it.Next() { - entries = append(entries, it.Entry()) - } + return &entryIteratorBackward{entries: make([]logproto.Entry, 0, 1024), forwardIter: it}, it.Error() +} - return &entryIteratorBackward{entries: entries}, it.Error() +func (i *entryIteratorBackward) load() { + if !i.loaded { + i.loaded = true + for i.forwardIter.Next() { + entry := i.forwardIter.Entry() + i.entries = append(i.entries, entry) + } + i.forwardIter.Close() + } } func (i *entryIteratorBackward) Next() bool { + i.load() if len(i.entries) == 0 { + i.entries = nil return false } - - i.cur = i.entries[len(i.entries)-1] - i.entries = i.entries[:len(i.entries)-1] - + i.cur, i.entries = i.entries[len(i.entries)-1], i.entries[:len(i.entries)-1] return true } diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 531676456e6c8dc4511fd0edd53758dba6547f57..83e40a90cf3d7f9f6454271aa62c5d368594bbe2 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -1,26 +1,29 @@ package logql import ( + "bytes" "fmt" "regexp" - "strings" "github.com/grafana/loki/pkg/iter" "github.com/prometheus/prometheus/pkg/labels" ) +// Filter is a line filter sent to a querier to filter out log line. +type Filter func([]byte) bool + // QuerierFunc implements Querier. -type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error) +type QuerierFunc func([]*labels.Matcher, Filter) (iter.EntryIterator, error) // Query implements Querier. -func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) { - return q(ms) +func (q QuerierFunc) Query(ms []*labels.Matcher, entryFilter Filter) (iter.EntryIterator, error) { + return q(ms, entryFilter) } // Querier allows a LogQL expression to fetch an EntryIterator for a // set of matchers. type Querier interface { - Query([]*labels.Matcher) (iter.EntryIterator, error) + Query([]*labels.Matcher, Filter) (iter.EntryIterator, error) } // Expr is a LogQL expression. @@ -34,7 +37,7 @@ type matchersExpr struct { } func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) { - return q.Query(e.matchers) + return q.Query(e.matchers, nil) } func (e *matchersExpr) Matchers() []*labels.Matcher { @@ -60,45 +63,61 @@ func NewFilterExpr(left Expr, ty labels.MatchType, match string) Expr { } } -func (e *filterExpr) Eval(q Querier) (iter.EntryIterator, error) { - var f func(string) bool +func (e *filterExpr) filter() (func([]byte) bool, error) { + var f func([]byte) bool switch e.ty { case labels.MatchRegexp: re, err := regexp.Compile(e.match) if err != nil { return nil, err } - f = re.MatchString + f = re.Match case labels.MatchNotRegexp: re, err := regexp.Compile(e.match) if err != nil { return nil, err } - f = func(line string) bool { - return !re.MatchString(line) + f = func(line []byte) bool { + return !re.Match(line) } case labels.MatchEqual: - f = func(line string) bool { - return strings.Contains(line, e.match) + f = func(line []byte) bool { + return bytes.Contains(line, []byte(e.match)) } case labels.MatchNotEqual: - f = func(line string) bool { - return !strings.Contains(line, e.match) + f = func(line []byte) bool { + return !bytes.Contains(line, []byte(e.match)) } default: return nil, fmt.Errorf("unknow matcher: %v", e.match) } + next, ok := e.left.(*filterExpr) + if ok { + nextFilter, err := next.filter() + if err != nil { + return nil, err + } + return func(line []byte) bool { + return nextFilter(line) && f(line) + }, nil + } + return f, nil +} - left, err := e.left.Eval(q) +func (e *filterExpr) Eval(q Querier) (iter.EntryIterator, error) { + f, err := e.filter() if err != nil { return nil, err } - - return iter.NewFilter(f, left), nil + next, err := q.Query(e.left.Matchers(), f) + if err != nil { + return nil, err + } + return next, nil } func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 2d1d143a4b7a655764da3cc2e1a9b57f94021e1b..0622749fdd59b8ccf39a59499740c980b5ac3681 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/querier" + loki_storage "github.com/grafana/loki/pkg/storage" ) // Config is the root config for Loki. @@ -69,7 +70,7 @@ type Loki struct { distributor *distributor.Distributor ingester *ingester.Ingester querier *querier.Querier - store chunk.Store + store loki_storage.Store tableManager *chunk.TableManager httpAuthMiddleware middleware.Interface diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2d63b1453d98cdbe842c1b985a7712d59a8e7c94..b5f05aef791f139c6cfbfcb30439a1d969e10c9f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/querier" + loki_storage "github.com/grafana/loki/pkg/storage" ) const maxChunkAgeForTableManager = 12 * time.Hour @@ -218,7 +219,7 @@ func (t *Loki) stopTableManager() error { } func (t *Loki) initStore() (err error) { - t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) + t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) return } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index eacf22fb0edd62e3df29220f0f8b28198105b18f..633fc587ffd4f3beee52f895ff2ea8f42146ba48 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -5,7 +5,6 @@ import ( "flag" "time" - "github.com/cortexproject/cortex/pkg/chunk" cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" @@ -18,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/storage" ) // Config for a querier. @@ -33,11 +33,11 @@ type Querier struct { cfg Config ring ring.ReadRing pool *cortex_client.Pool - store chunk.Store + store storage.Store } // New makes a new Querier. -func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store chunk.Store) (*Querier, error) { +func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store) (*Querier, error) { factory := func(addr string) (grpc_health_v1.HealthClient, error) { return client.New(clientCfg, addr) } @@ -107,12 +107,13 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l // Query does the heavy lifting for an actual query. func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) { + ingesterIterators, err := q.queryIngesters(ctx, req) if err != nil { return nil, err } - chunkStoreIterators, err := q.queryStore(ctx, req) + chunkStoreIterators, err := q.store.LazyQuery(ctx, req) if err != nil { return nil, err } @@ -298,7 +299,7 @@ func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailReq ingesterIterators[i] = iter.NewQueryClientIterator(clients[i].response.(logproto.Querier_QueryClient), query.Direction) } - chunkStoreIterators, err := q.queryStore(ctx, &query) + chunkStoreIterators, err := q.store.LazyQuery(ctx, &query) if err != nil { return nil, err } diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go new file mode 100644 index 0000000000000000000000000000000000000000..c52f573a75a12a88671a38033b7aa68b41c6b8c9 --- /dev/null +++ b/pkg/storage/hack/main.go @@ -0,0 +1,142 @@ +package main + +import ( + "context" + "fmt" + "log" + "math/rand" + "os" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + lstore "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/util" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/user" +) + +var ( + start = model.Time(1523750400000) + ctx = user.InjectOrgID(context.Background(), "fake") + maxChunks = 600 // 600 chunks is 1.2bib of data enough to run benchmark +) + +// fill up the local filesystem store with 1gib of data to run benchmark +func main() { + if _, err := os.Stat("/tmp/benchmark/chunks"); os.IsNotExist(err) { + if err := fillStore(); err != nil { + log.Fatal("error filling up storage:", err) + } + } +} + +func getStore() (lstore.Store, error) { + store, err := lstore.NewStore( + storage.Config{ + BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, + FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"}, + }, + chunk.StoreConfig{}, + chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: start}, + IndexType: "boltdb", + ObjectType: "filesystem", + Schema: "v9", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 168, + }, + }, + }, + }, + &validation.Overrides{}, + ) + if err != nil { + return nil, err + } + return store, nil +} + +func fillStore() error { + + store, err := getStore() + if err != nil { + return err + } + defer store.Stop() + + var wgPush sync.WaitGroup + var flushCount int + // insert 5 streams with a random logs every nanoseconds + // the string is randomize so chunks are big ~2mb + // take ~1min to build 1gib of data + for i := 0; i < 5; i++ { + wgPush.Add(1) + go func(j int) { + defer wgPush.Done() + lbs, err := util.ToClientLabels(fmt.Sprintf("{foo=\"bar\",level=\"%d\"}", j)) + if err != nil { + panic(err) + } + labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(lbs)) + labelsBuilder.Set(labels.MetricName, "logs") + metric := labelsBuilder.Labels() + fp := client.FastFingerprint(lbs) + chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144) + for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { + entry := &logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: randString(250), + } + if chunkEnc.SpaceFor(entry) { + _ = chunkEnc.Append(entry) + } else { + from, to := chunkEnc.Bounds() + c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano())) + if err := c.Encode(); err != nil { + panic(err) + } + err := store.Put(ctx, []chunk.Chunk{c}) + if err != nil { + panic(err) + } + flushCount++ + log.Println("flushed ", flushCount, from.UnixNano(), to.UnixNano(), metric) + if flushCount >= maxChunks { + return + } + chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144) + } + } + + }(i) + + } + wgPush.Wait() + return nil +} + +const charset = "abcdefghijklmnopqrstuvwxyz" + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +func randStringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset)-1)] + } + return string(b) +} + +func randString(length int) string { + return randStringWithCharset(length, charset) +} diff --git a/pkg/querier/store.go b/pkg/storage/store.go similarity index 63% rename from pkg/querier/store.go rename to pkg/storage/store.go index 9a2c90219a9b40427f6bde1e2d85bb10df0e7283..0ff8fccae6154ed6bae855fee4c1ea29a0c6c5e0 100644 --- a/pkg/querier/store.go +++ b/pkg/storage/store.go @@ -1,21 +1,45 @@ -package querier +package storage import ( "context" "sort" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/labels" - + "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" ) -func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) { +// Store is the Loki chunk store to retrieve and save chunks. +type Store interface { + chunk.Store + LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) +} + +type store struct { + chunk.Store +} + +// NewStore creates a new Loki Store using configuration supplied. +func NewStore(cfg storage.Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (Store, error) { + s, err := storage.NewStore(cfg, storeCfg, schemaCfg, limits) + if err != nil { + return nil, err + } + return &store{ + Store: s, + }, nil +} + +// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront +// for that request. +func (s *store) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) { expr, err := logql.ParseExpr(req.Query) if err != nil { return nil, err @@ -25,7 +49,7 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it expr = logql.NewFilterExpr(expr, labels.MatchRegexp, req.Regex) } - querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) { + querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) { nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") if err != nil { return nil, err @@ -33,7 +57,7 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it matchers = append(matchers, nameLabelMatcher) from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) - chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...) + chks, fetchers, err := s.GetChunkRefs(ctx, from, through, matchers...) if err != nil { return nil, err } @@ -46,15 +70,15 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it // Make sure the initial chunks are loaded. This is not one chunk // per series, but rather a chunk per non-overlapping iterator. - if err := loadFirstChunks(ctx, chksBySeries); err != nil { + if err := loadFirstChunks(ctx, chksBySeries, req); err != nil { return nil, err } // Now that we have the first chunk for each series loaded, // we can proceed to filter the series that don't match. - chksBySeries = filterSeriesByMatchers(chksBySeries, matchers) + chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, req) - iters, err := buildIterators(ctx, req, chksBySeries) + iters, err := buildIterators(ctx, req, chksBySeries, filter) if err != nil { return nil, err } @@ -76,60 +100,77 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. return filtered } -func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]chunkenc.LazyChunk { +func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher, req *logproto.QueryRequest) map[model.Fingerprint][][]chunkenc.LazyChunk { outer: for fp, chunks := range chks { for _, matcher := range matchers { - if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) { - delete(chks, fp) - continue outer + // checks matchers against the last chunk if we're doing BACKWARD + if req.Direction == logproto.BACKWARD { + if !matcher.Matches(chunks[0][len(chunks[0])-1].Chunk.Metric.Get(matcher.Name)) { + delete(chks, fp) + continue outer + } + } else { + if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) { + delete(chks, fp) + continue outer + } } + } } return chks } -func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk) ([]iter.EntryIterator, error) { +func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk, filter logql.Filter) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { - iterator, err := buildHeapIterator(ctx, req, chunks) + iterator, err := buildHeapIterator(ctx, req, chunks, filter) if err != nil { return nil, err } - result = append(result, iterator) } return result, nil } -func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk) (iter.EntryIterator, error) { +func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk, filter logql.Filter) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) - if chks[0][0].Chunk.Metric.Has("__name__") { - labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric) + var fetchedChunkIndex int + if req.Direction == logproto.BACKWARD { + fetchedChunkIndex = len(chks[0]) - 1 + } + if chks[0][fetchedChunkIndex].Chunk.Metric.Has("__name__") { + labelsBuilder := labels.NewBuilder(chks[0][fetchedChunkIndex].Chunk.Metric) labelsBuilder.Del("__name__") - chks[0][0].Chunk.Metric = labelsBuilder.Labels() + chks[0][fetchedChunkIndex].Chunk.Metric = labelsBuilder.Labels() } - labels := chks[0][0].Chunk.Metric.String() + labels := chks[0][fetchedChunkIndex].Chunk.Metric.String() for i := range chks { iterators := make([]iter.EntryIterator, 0, len(chks[i])) for j := range chks[i] { - iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction) + iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction, filter) if err != nil { return nil, err } iterators = append(iterators, iterator) } - + // reverse chunks to start with the last one. + if req.Direction == logproto.BACKWARD { + for i, j := 0, len(iterators)-1; i < j; i, j = i+1, j-1 { + iterators[i], iterators[j] = iterators[j], iterators[i] + } + } result = append(result, iter.NewNonOverlappingIterator(iterators, labels)) } return iter.NewHeapIterator(result, req.Direction), nil } -func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk) error { +func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk, req *logproto.QueryRequest) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "loadFirstChunks") defer sp.Finish() @@ -140,7 +181,12 @@ func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunken if len(lchk) == 0 { continue } - chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0]) + // load the last chunk if we're doing BACKWARD + if req.Direction == logproto.BACKWARD { + chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[len(lchk)-1]) + } else { + chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0]) + } } } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go new file mode 100644 index 0000000000000000000000000000000000000000..899fa12b74d17eb49f74b703f79b634f2e37a71c --- /dev/null +++ b/pkg/storage/store_test.go @@ -0,0 +1,163 @@ +package storage + +import ( + "context" + "log" + "runtime" + "testing" + "time" + + "net/http" + _ "net/http/pprof" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/user" +) + +var ( + start = model.Time(1523750400000) + m runtime.MemStats + ctx = user.InjectOrgID(context.Background(), "fake") + chunkStore = getStore() +) + +//go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out +func Benchmark_store_LazyQueryRegexBackward(b *testing.B) { + benchmarkStoreQuery(b, &logproto.QueryRequest{ + Query: "{foo=\"bar\"}", + Regex: "fuzz", + Limit: 1000, + Start: time.Unix(0, start.UnixNano()), + End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), + Direction: logproto.BACKWARD, + }) +} + +func Benchmark_store_LazyQueryLogQLBackward(b *testing.B) { + benchmarkStoreQuery(b, &logproto.QueryRequest{ + Query: "{foo=\"bar\"} |= \"test\" != \"toto\"", + Regex: "fuzz", + Limit: 1000, + Start: time.Unix(0, start.UnixNano()), + End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), + Direction: logproto.BACKWARD, + }) +} + +func Benchmark_store_LazyQueryRegexForward(b *testing.B) { + benchmarkStoreQuery(b, &logproto.QueryRequest{ + Query: "{foo=\"bar\"}", + Regex: "fuzz", + Limit: 1000, + Start: time.Unix(0, start.UnixNano()), + End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), + Direction: logproto.FORWARD, + }) +} + +func Benchmark_store_LazyQueryForward(b *testing.B) { + benchmarkStoreQuery(b, &logproto.QueryRequest{ + Query: "{foo=\"bar\"}", + Limit: 1000, + Start: time.Unix(0, start.UnixNano()), + End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), + Direction: logproto.FORWARD, + }) +} + +func Benchmark_store_LazyQueryBackward(b *testing.B) { + benchmarkStoreQuery(b, &logproto.QueryRequest{ + Query: "{foo=\"bar\"}", + Limit: 1000, + Start: time.Unix(0, start.UnixNano()), + End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), + Direction: logproto.BACKWARD, + }) +} + +func benchmarkStoreQuery(b *testing.B, query *logproto.QueryRequest) { + b.ReportAllocs() + // force to run gc 10x more often this can be useful to detect fast allocation vs leak. + //debug.SetGCPercent(10) + stop := make(chan struct{}) + go func() { + _ = http.ListenAndServe(":6060", http.DefaultServeMux) + }() + go func() { + ticker := time.NewTicker(time.Millisecond) + for { + select { + case <-ticker.C: + // print and capture the max in use heap size + printHeap(b, false) + case <-stop: + ticker.Stop() + return + } + } + }() + for i := 0; i < b.N; i++ { + iter, err := chunkStore.LazyQuery(ctx, query) + if err != nil { + b.Fatal(err) + } + res := []logproto.Entry{} + printHeap(b, false) + j := uint32(0) + for iter.Next() { + j++ + printHeap(b, false) + res = append(res, iter.Entry()) + // limit result like the querier would do. + if j == query.Limit { + break + } + } + iter.Close() + printHeap(b, true) + log.Println("line fetched", len(res)) + } + close(stop) +} + +var maxHeapInuse uint64 + +func printHeap(b *testing.B, show bool) { + runtime.ReadMemStats(&m) + if m.HeapInuse > maxHeapInuse { + maxHeapInuse = m.HeapInuse + } + if show { + log.Printf("Benchmark %d maxHeapInuse: %d Mbytes\n", b.N, maxHeapInuse/1024/1024) + log.Printf("Benchmark %d currentHeapInuse: %d Mbytes\n", b.N, m.HeapInuse/1024/1024) + } +} + +func getStore() Store { + store, err := NewStore(storage.Config{ + BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, + FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"}, + }, chunk.StoreConfig{}, chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: start}, + IndexType: "boltdb", + ObjectType: "filesystem", + Schema: "v9", + IndexTables: chunk.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 168, + }, + }, + }, + }, &validation.Overrides{}) + if err != nil { + panic(err) + } + return store +} diff --git a/pkg/util/conv.go b/pkg/util/conv.go index 084885177541f54e2a42606f10d703186f238dc3..bc95638cb8ee0e9909aa5b8c6e18ee0896093e6d 100644 --- a/pkg/util/conv.go +++ b/pkg/util/conv.go @@ -1,19 +1,36 @@ package util import ( + "sort" + "strings" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/grafana/loki/pkg/logql" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/promql" ) +type byLabel []client.LabelAdapter + +func (s byLabel) Len() int { return len(s) } +func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 } +func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // ToClientLabels parses the labels and converts them to the Cortex type. func ToClientLabels(labels string) ([]client.LabelAdapter, error) { - ls, err := promql.ParseMetric(labels) + ls, err := logql.ParseExpr(labels) if err != nil { return nil, err } - - return client.FromLabelsToLabelAdapaters(ls), nil + matchers := ls.Matchers() + result := make([]client.LabelAdapter, 0, len(matchers)) + for _, m := range matchers { + result = append(result, client.LabelAdapter{ + Name: m.Name, + Value: m.Value, + }) + } + sort.Sort(byLabel(result)) + return result, nil } // ModelLabelSetToMap convert a model.LabelSet to a map[string]string