From b4dafac4885a5c347aaa72e555e066d636e49d4e Mon Sep 17 00:00:00 2001
From: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Date: Fri, 24 May 2019 18:45:52 +0530
Subject: [PATCH] Changes for running Table Manager with loki in single binary
 (#600)

---
 pkg/loki/loki.go    | 35 +++++++++++++-----------
 pkg/loki/modules.go | 65 ++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 83 insertions(+), 17 deletions(-)

diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go
index 9fc21207..eb6bc8c5 100644
--- a/pkg/loki/loki.go
+++ b/pkg/loki/loki.go
@@ -27,15 +27,16 @@ type Config struct {
 	Target      moduleName `yaml:"target,omitempty"`
 	AuthEnabled bool       `yaml:"auth_enabled,omitempty"`
 
-	Server           server.Config      `yaml:"server,omitempty"`
-	Distributor      distributor.Config `yaml:"distributor,omitempty"`
-	Querier          querier.Config     `yaml:"querier,omitempty"`
-	IngesterClient   client.Config      `yaml:"ingester_client,omitempty"`
-	Ingester         ingester.Config    `yaml:"ingester,omitempty"`
-	StorageConfig    storage.Config     `yaml:"storage_config,omitempty"`
-	ChunkStoreConfig chunk.StoreConfig  `yaml:"chunk_store_config,omitempty"`
-	SchemaConfig     chunk.SchemaConfig `yaml:"schema_config,omitempty"`
-	LimitsConfig     validation.Limits  `yaml:"limits_config,omitempty"`
+	Server           server.Config            `yaml:"server,omitempty"`
+	Distributor      distributor.Config       `yaml:"distributor,omitempty"`
+	Querier          querier.Config           `yaml:"querier,omitempty"`
+	IngesterClient   client.Config            `yaml:"ingester_client,omitempty"`
+	Ingester         ingester.Config          `yaml:"ingester,omitempty"`
+	StorageConfig    storage.Config           `yaml:"storage_config,omitempty"`
+	ChunkStoreConfig chunk.StoreConfig        `yaml:"chunk_store_config,omitempty"`
+	SchemaConfig     chunk.SchemaConfig       `yaml:"schema_config,omitempty"`
+	LimitsConfig     validation.Limits        `yaml:"limits_config,omitempty"`
+	TableManager     chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
 }
 
 // RegisterFlags registers flag.
@@ -55,19 +56,21 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
 	c.ChunkStoreConfig.RegisterFlags(f)
 	c.SchemaConfig.RegisterFlags(f)
 	c.LimitsConfig.RegisterFlags(f)
+	c.TableManager.RegisterFlags(f)
 }
 
 // Loki is the root datastructure for Loki.
 type Loki struct {
 	cfg Config
 
-	server      *server.Server
-	ring        *ring.Ring
-	overrides   *validation.Overrides
-	distributor *distributor.Distributor
-	ingester    *ingester.Ingester
-	querier     *querier.Querier
-	store       chunk.Store
+	server       *server.Server
+	ring         *ring.Ring
+	overrides    *validation.Overrides
+	distributor  *distributor.Distributor
+	ingester     *ingester.Ingester
+	querier      *querier.Querier
+	store        chunk.Store
+	tableManager *chunk.TableManager
 
 	httpAuthMiddleware middleware.Interface
 }
diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go
index 54940eaa..9f60b6bb 100644
--- a/pkg/loki/modules.go
+++ b/pkg/loki/modules.go
@@ -3,12 +3,17 @@ package loki
 import (
 	"fmt"
 	"net/http"
+	"os"
 	"strings"
+	"time"
 
+	"github.com/go-kit/kit/log/level"
 	"google.golang.org/grpc/health/grpc_health_v1"
 
+	"github.com/cortexproject/cortex/pkg/chunk"
 	"github.com/cortexproject/cortex/pkg/chunk/storage"
 	"github.com/cortexproject/cortex/pkg/ring"
+	"github.com/cortexproject/cortex/pkg/util"
 	"github.com/cortexproject/cortex/pkg/util/validation"
 	"github.com/weaveworks/common/middleware"
 	"github.com/weaveworks/common/server"
@@ -19,6 +24,8 @@ import (
 	"github.com/grafana/loki/pkg/querier"
 )
 
+const maxChunkAgeForTableManager = 12 * time.Hour
+
 type moduleName int
 
 // The various modules that make up Loki.
@@ -30,6 +37,7 @@ const (
 	Ingester
 	Querier
 	Store
+	TableManager
 	All
 )
 
@@ -49,6 +57,8 @@ func (m moduleName) String() string {
 		return "ingester"
 	case Querier:
 		return "querier"
+	case TableManager:
+		return "table-manager"
 	case All:
 		return "all"
 	default:
@@ -79,6 +89,9 @@ func (m *moduleName) Set(s string) error {
 	case "querier":
 		*m = Querier
 		return nil
+	case "table-manager":
+		*m = TableManager
+		return nil
 	case "all":
 		*m = All
 		return nil
@@ -155,6 +168,50 @@ func (t *Loki) stopIngester() error {
 	return nil
 }
 
+func (t *Loki) initTableManager() error {
+	err := t.cfg.SchemaConfig.Load()
+	if err != nil {
+		return err
+	}
+
+	// Assume the newest config is the one to use
+	lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1]
+
+	if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled ||
+		t.cfg.TableManager.IndexTables.WriteScale.Enabled ||
+		t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
+		t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
+		t.cfg.TableManager.ChunkTables.ReadScale.Enabled ||
+		t.cfg.TableManager.IndexTables.ReadScale.Enabled ||
+		t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
+		t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
+		(t.cfg.StorageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "") {
+		level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided")
+		os.Exit(1)
+	}
+
+	tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig)
+	if err != nil {
+		return err
+	}
+
+	bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig)
+	util.CheckFatal("initializing bucket client", err)
+
+	t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient)
+	if err != nil {
+		return err
+	}
+
+	t.tableManager.Start()
+	return nil
+}
+
+func (t *Loki) stopTableManager() error {
+	t.tableManager.Stop()
+	return nil
+}
+
 func (t *Loki) initStore() (err error) {
 	t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
 	return
@@ -253,7 +310,13 @@ var modules = map[moduleName]module{
 		init: (*Loki).initQuerier,
 	},
 
+	TableManager: {
+		deps: []moduleName{Server},
+		init: (*Loki).initTableManager,
+		stop: (*Loki).stopTableManager,
+	},
+
 	All: {
-		deps: []moduleName{Querier, Ingester, Distributor},
+		deps: []moduleName{Querier, Ingester, Distributor, TableManager},
 	},
 }
-- 
GitLab