From 3b69cf127d18b3ff1c06c54caaea439fc40fcd31 Mon Sep 17 00:00:00 2001 From: Tom Wilkie <tom.wilkie@gmail.com> Date: Tue, 20 Nov 2018 14:13:36 +0000 Subject: [PATCH] Don't repeat client config. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> --- cmd/tempo/main.go | 21 +++++++++++---------- pkg/distributor/distributor.go | 29 +++++++++++------------------ pkg/ingester/client/client.go | 6 ++++++ pkg/querier/querier.go | 12 ++++-------- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go index c2a6731d..dfc3d2ef 100644 --- a/cmd/tempo/main.go +++ b/cmd/tempo/main.go @@ -7,29 +7,30 @@ import ( "os" "strings" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" - - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/tempo/pkg/distributor" "github.com/grafana/tempo/pkg/flagext" "github.com/grafana/tempo/pkg/ingester" + "github.com/grafana/tempo/pkg/ingester/client" "github.com/grafana/tempo/pkg/logproto" "github.com/grafana/tempo/pkg/querier" ) type config struct { - serverConfig server.Config - distributorConfig distributor.Config - ingesterConfig ingester.Config - querierConfig querier.Config + serverConfig server.Config + distributorConfig distributor.Config + ingesterConfig ingester.Config + querierConfig querier.Config + ingesterClientConfig client.Config } func (c *config) RegisterFlags(f *flag.FlagSet) { @@ -39,7 +40,7 @@ func (c *config) RegisterFlags(f *flag.FlagSet) { } flagext.RegisterConfigs(f, &c.serverConfig, &c.distributorConfig, - &c.ingesterConfig, &c.querierConfig) + &c.ingesterConfig, &c.querierConfig, &c.ingesterClientConfig) } type Tempo struct { @@ -136,7 +137,7 @@ var modules = map[moduleName]module{ Distributor: module{ deps: []moduleName{Ring, Server}, init: func(t *Tempo, cfg *config) (err error) { - t.distributor, err = distributor.New(cfg.distributorConfig, t.ring) + t.distributor, err = distributor.New(cfg.distributorConfig, cfg.ingesterClientConfig, t.ring) if err != nil { return } @@ -178,7 +179,7 @@ var modules = map[moduleName]module{ Querier: module{ deps: []moduleName{Ring, Server}, init: func(t *Tempo, cfg *config) (err error) { - t.querier, err = querier.New(cfg.querierConfig, t.ring) + t.querier, err = querier.New(cfg.querierConfig, cfg.ingesterClientConfig, t.ring) if err != nil { return } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 5c8b3656..b22b3e34 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -5,7 +5,6 @@ import ( "flag" "hash/fnv" "sync/atomic" - "time" cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" @@ -47,36 +46,30 @@ func init() { // Config for a Distributor. type Config struct { - RemoteTimeout time.Duration - ClientConfig client.Config - PoolConfig cortex_client.PoolConfig } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.ClientConfig.RegisterFlags(f) - cfg.PoolConfig.RegisterFlags(f) - - f.DurationVar(&cfg.RemoteTimeout, "ingester.remote-timeout", 10*time.Second, "") } // Distributor coordinates replicates and distribution of log streams. type Distributor struct { - cfg Config - ring ring.ReadRing - pool *cortex_client.Pool + cfg Config + clientCfg client.Config + ring ring.ReadRing + pool *cortex_client.Pool } // New a distributor creates. -func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { +func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, error) { factory := func(addr string) (grpc_health_v1.HealthClient, error) { - return client.New(cfg.ClientConfig, addr) + return client.New(clientCfg, addr) } - cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout return &Distributor{ - cfg: cfg, - ring: ring, - pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory, util.Logger), + cfg: cfg, + clientCfg: clientCfg, + ring: ring, + pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger), }, nil } @@ -142,7 +135,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log for ingester, samples := range samplesByIngester { go func(ingester ring.IngesterDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early - localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) defer cancel() localCtx = user.InjectOrgID(localCtx, userID) if sp := opentracing.SpanFromContext(ctx); sp != nil { diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index e236e60b..7573e06e 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -3,7 +3,9 @@ package client import ( "flag" "io" + "time" + cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/grafana/tempo/pkg/logproto" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" @@ -14,11 +16,15 @@ import ( ) type Config struct { + PoolConfig cortex_client.PoolConfig MaxRecvMsgSize int + RemoteTimeout time.Duration } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") + f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester.client.healthcheck-timeout", 1*time.Second, "Timeout for healthcheck rpcs.") + f.DurationVar(&cfg.RemoteTimeout, "ingester.client.timeout", 5*time.Second, "Timeout for ingester client RPCs.") } func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index af976966..a5815c58 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -16,16 +16,12 @@ import ( ) type Config struct { - PoolConfig cortex_client.PoolConfig - + PoolConfig cortex_client.PoolConfig RemoteTimeout time.Duration - ClientConfig client.Config } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.ClientConfig.RegisterFlags(f) cfg.PoolConfig.RegisterFlags(f) - f.DurationVar(&cfg.RemoteTimeout, "querier.remote-timeout", 10*time.Second, "") } @@ -35,15 +31,15 @@ type Querier struct { pool *cortex_client.Pool } -func New(cfg Config, ring ring.ReadRing) (*Querier, error) { +func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Querier, error) { factory := func(addr string) (grpc_health_v1.HealthClient, error) { - return client.New(cfg.ClientConfig, addr) + return client.New(clientCfg, addr) } return &Querier{ cfg: cfg, ring: ring, - pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory, util.Logger), + pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger), }, nil } -- GitLab