Skip to content
Snippets Groups Projects
Commit 3b69cf12 authored by Tom Wilkie's avatar Tom Wilkie
Browse files

Don't repeat client config.


Signed-off-by: default avatarTom Wilkie <tom.wilkie@gmail.com>
parent 3f6eefb8
No related branches found
No related tags found
No related merge requests found
......@@ -7,29 +7,30 @@ import (
"os"
"strings"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/tempo/pkg/distributor"
"github.com/grafana/tempo/pkg/flagext"
"github.com/grafana/tempo/pkg/ingester"
"github.com/grafana/tempo/pkg/ingester/client"
"github.com/grafana/tempo/pkg/logproto"
"github.com/grafana/tempo/pkg/querier"
)
type config struct {
serverConfig server.Config
distributorConfig distributor.Config
ingesterConfig ingester.Config
querierConfig querier.Config
serverConfig server.Config
distributorConfig distributor.Config
ingesterConfig ingester.Config
querierConfig querier.Config
ingesterClientConfig client.Config
}
func (c *config) RegisterFlags(f *flag.FlagSet) {
......@@ -39,7 +40,7 @@ func (c *config) RegisterFlags(f *flag.FlagSet) {
}
flagext.RegisterConfigs(f, &c.serverConfig, &c.distributorConfig,
&c.ingesterConfig, &c.querierConfig)
&c.ingesterConfig, &c.querierConfig, &c.ingesterClientConfig)
}
type Tempo struct {
......@@ -136,7 +137,7 @@ var modules = map[moduleName]module{
Distributor: module{
deps: []moduleName{Ring, Server},
init: func(t *Tempo, cfg *config) (err error) {
t.distributor, err = distributor.New(cfg.distributorConfig, t.ring)
t.distributor, err = distributor.New(cfg.distributorConfig, cfg.ingesterClientConfig, t.ring)
if err != nil {
return
}
......@@ -178,7 +179,7 @@ var modules = map[moduleName]module{
Querier: module{
deps: []moduleName{Ring, Server},
init: func(t *Tempo, cfg *config) (err error) {
t.querier, err = querier.New(cfg.querierConfig, t.ring)
t.querier, err = querier.New(cfg.querierConfig, cfg.ingesterClientConfig, t.ring)
if err != nil {
return
}
......
......@@ -5,7 +5,6 @@ import (
"flag"
"hash/fnv"
"sync/atomic"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
......@@ -47,36 +46,30 @@ func init() {
// Config for a Distributor.
type Config struct {
RemoteTimeout time.Duration
ClientConfig client.Config
PoolConfig cortex_client.PoolConfig
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ClientConfig.RegisterFlags(f)
cfg.PoolConfig.RegisterFlags(f)
f.DurationVar(&cfg.RemoteTimeout, "ingester.remote-timeout", 10*time.Second, "")
}
// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
cfg Config
ring ring.ReadRing
pool *cortex_client.Pool
cfg Config
clientCfg client.Config
ring ring.ReadRing
pool *cortex_client.Pool
}
// New a distributor creates.
func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Distributor, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(cfg.ClientConfig, addr)
return client.New(clientCfg, addr)
}
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout
return &Distributor{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory, util.Logger),
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger),
}, nil
}
......@@ -142,7 +135,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
for ingester, samples := range samplesByIngester {
go func(ingester ring.IngesterDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
......
......@@ -3,7 +3,9 @@ package client
import (
"flag"
"io"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/grafana/tempo/pkg/logproto"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
......@@ -14,11 +16,15 @@ import (
)
type Config struct {
PoolConfig cortex_client.PoolConfig
MaxRecvMsgSize int
RemoteTimeout time.Duration
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.")
f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester.client.healthcheck-timeout", 1*time.Second, "Timeout for healthcheck rpcs.")
f.DurationVar(&cfg.RemoteTimeout, "ingester.client.timeout", 5*time.Second, "Timeout for ingester client RPCs.")
}
func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
......
......@@ -16,16 +16,12 @@ import (
)
type Config struct {
PoolConfig cortex_client.PoolConfig
PoolConfig cortex_client.PoolConfig
RemoteTimeout time.Duration
ClientConfig client.Config
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ClientConfig.RegisterFlags(f)
cfg.PoolConfig.RegisterFlags(f)
f.DurationVar(&cfg.RemoteTimeout, "querier.remote-timeout", 10*time.Second, "")
}
......@@ -35,15 +31,15 @@ type Querier struct {
pool *cortex_client.Pool
}
func New(cfg Config, ring ring.ReadRing) (*Querier, error) {
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing) (*Querier, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(cfg.ClientConfig, addr)
return client.New(clientCfg, addr)
}
return &Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory, util.Logger),
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger),
}, nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment