Skip to content
Snippets Groups Projects
Commit b4dafac4 authored by Sandeep Sukhani's avatar Sandeep Sukhani Committed by Cyril Tovena
Browse files

Changes for running Table Manager with loki in single binary (#600)

parent 3d3c0571
No related branches found
No related tags found
No related merge requests found
......@@ -27,15 +27,16 @@ type Config struct {
Target moduleName `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
}
// RegisterFlags registers flag.
......@@ -55,19 +56,21 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.ChunkStoreConfig.RegisterFlags(f)
c.SchemaConfig.RegisterFlags(f)
c.LimitsConfig.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
}
// Loki is the root datastructure for Loki.
type Loki struct {
cfg Config
server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
store chunk.Store
server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
store chunk.Store
tableManager *chunk.TableManager
httpAuthMiddleware middleware.Interface
}
......
......@@ -3,12 +3,17 @@ package loki
import (
"fmt"
"net/http"
"os"
"strings"
"time"
"github.com/go-kit/kit/log/level"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
......@@ -19,6 +24,8 @@ import (
"github.com/grafana/loki/pkg/querier"
)
const maxChunkAgeForTableManager = 12 * time.Hour
type moduleName int
// The various modules that make up Loki.
......@@ -30,6 +37,7 @@ const (
Ingester
Querier
Store
TableManager
All
)
......@@ -49,6 +57,8 @@ func (m moduleName) String() string {
return "ingester"
case Querier:
return "querier"
case TableManager:
return "table-manager"
case All:
return "all"
default:
......@@ -79,6 +89,9 @@ func (m *moduleName) Set(s string) error {
case "querier":
*m = Querier
return nil
case "table-manager":
*m = TableManager
return nil
case "all":
*m = All
return nil
......@@ -155,6 +168,50 @@ func (t *Loki) stopIngester() error {
return nil
}
func (t *Loki) initTableManager() error {
err := t.cfg.SchemaConfig.Load()
if err != nil {
return err
}
// Assume the newest config is the one to use
lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1]
if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled ||
t.cfg.TableManager.IndexTables.WriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.ReadScale.Enabled ||
t.cfg.TableManager.IndexTables.ReadScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
(t.cfg.StorageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "") {
level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided")
os.Exit(1)
}
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig)
if err != nil {
return err
}
bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig)
util.CheckFatal("initializing bucket client", err)
t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient)
if err != nil {
return err
}
t.tableManager.Start()
return nil
}
func (t *Loki) stopTableManager() error {
t.tableManager.Stop()
return nil
}
func (t *Loki) initStore() (err error) {
t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
return
......@@ -253,7 +310,13 @@ var modules = map[moduleName]module{
init: (*Loki).initQuerier,
},
TableManager: {
deps: []moduleName{Server},
init: (*Loki).initTableManager,
stop: (*Loki).stopTableManager,
},
All: {
deps: []moduleName{Querier, Ingester, Distributor},
deps: []moduleName{Querier, Ingester, Distributor, TableManager},
},
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment