diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go index b061ac5b0e7930d17320bc9696bd5ad0a72ee4b4..6a4d6e938526552d583079985b767b77063fef5c 100644 --- a/cmd/tempo/main.go +++ b/cmd/tempo/main.go @@ -2,8 +2,10 @@ package main import ( "flag" + "fmt" "net/http" "os" + "strings" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" @@ -23,142 +25,221 @@ import ( "github.com/grafana/tempo/pkg/querier" ) -type target struct { - deps []string - init func() error - stop func() +type config struct { + serverConfig server.Config + ringConfig ring.Config + distributorConfig distributor.Config + ingesterConfig ingester.Config + querierConfig querier.Config } -func main() { - var ( - flagset = flag.NewFlagSet("", flag.ExitOnError) - serverConfig = server.Config{ - MetricsNamespace: "tempo", - GRPCMiddleware: []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - }, - } - ringConfig ring.Config - distributorConfig distributor.Config - ingesterConfig ingester.Config - querierConfig querier.Config - ) - flagext.RegisterConfigs(flagset, &serverConfig, &ringConfig, &distributorConfig, - &ingesterConfig, &querierConfig) - flagset.Parse(os.Args[1:]) - util.InitLogger(&serverConfig) +func (c *config) RegisterFlags(f *flag.FlagSet) { + c.serverConfig.MetricsNamespace = "tempo" + c.serverConfig.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + middleware.ServerUserHeaderInterceptor, + } + + flagext.RegisterConfigs(f, &c.serverConfig, &c.ringConfig, + &c.distributorConfig, &c.ingesterConfig, &c.querierConfig) +} + +type Tempo struct { + server *server.Server + ring *ring.Ring + distributor *distributor.Distributor + ingester *ingester.Ingester + querier *querier.Querier +} + +type moduleName int + +const ( + Ring moduleName = iota + Server + Distributor + Ingester + Querier + All +) + +func (m moduleName) String() string { + switch m { + case Ring: + return "ring" + case Server: + return "server" + case Distributor: + return "distributor" + case Ingester: + return "ingester" + case Querier: + return "querier" + case All: + return "all" + default: + panic(fmt.Sprintf("unknow module name: %d", m)) + } +} - server, err := server.New(serverConfig) - if err != nil { - log.Fatalf("Error initializing server: %v", err) +func (m *moduleName) Set(s string) error { + switch strings.ToLower(s) { + case "ring": + *m = Ring + return nil + case "server": + *m = Server + return nil + case "distributor": + *m = Distributor + return nil + case "ingester": + *m = Ingester + return nil + case "querier": + *m = Querier + return nil + case "all": + *m = All + return nil + default: + return fmt.Errorf("unrecognised module name: %s", s) } - defer server.Shutdown() - - var ( - r *ring.Ring - d *distributor.Distributor - i *ingester.Ingester - q *querier.Querier - - mods = map[string]target{ - "ring": target{ - init: func() (err error) { - r, err = ring.New(ringConfig) - return - }, - stop: func() { - r.Stop() - }, - }, - "distributor": target{ - deps: []string{"ring"}, - init: func() (err error) { - d, err = distributor.New(distributorConfig, r) - if err != nil { - return - } - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - server.HTTP.Handle("/api/prom/push", middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - middleware.AuthenticateUser, - ).Wrap(http.HandlerFunc(d.PushHandler))) - server.HTTP.Handle("/ring", r) - return - }, - stop: func() {}, - }, - "ingester": target{ - deps: []string{"ring"}, - init: func() (err error) { - ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort - i, err = ingester.New(ingesterConfig) - if err != nil { - return - } - logproto.RegisterPusherServer(server.GRPC, i) - logproto.RegisterQuerierServer(server.GRPC, i) - grpc_health_v1.RegisterHealthServer(server.GRPC, i) - server.HTTP.Path("/ready").Handler(http.HandlerFunc(i.ReadinessHandler)) - return - }, - stop: func() { - i.Shutdown() - }, - }, - "querier": target{ - deps: []string{"ring"}, - init: func() (err error) { - q, err = querier.New(querierConfig, r) - if err != nil { - return - } - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - httpMiddleware := middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - middleware.AuthenticateUser, - ) - server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(q.QueryHandler))) - server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(q.LabelHandler))) - server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(q.LabelHandler))) - return - }, - stop: func() {}, - }, - "lite": target{ - deps: []string{"querier", "ingester", "distributor"}, - init: func() (err error) { - return - }, - stop: func() {}, - }, - } - - inited = map[string]struct{}{} - ) - - var run func(mod string) - run = func(mod string) { - if _, ok := inited[mod]; ok { +} + +type module struct { + deps []moduleName + init func(t *Tempo, cfg *config) error + stop func(t *Tempo) +} + +var modules = map[moduleName]module{ + Ring: module{ + init: func(t *Tempo, cfg *config) (err error) { + t.ring, err = ring.New(cfg.ringConfig) + if err != nil { + return + } + t.server.HTTP.Handle("/ring", t.ring) + return + }, + }, + + Server: module{ + init: func(t *Tempo, cfg *config) (err error) { + t.server, err = server.New(cfg.serverConfig) + return + }, + stop: func(t *Tempo) { + t.server.Shutdown() + }, + }, + + Distributor: module{ + deps: []moduleName{Ring, Server}, + init: func(t *Tempo, cfg *config) (err error) { + t.distributor, err = distributor.New(cfg.distributorConfig, t.ring) + if err != nil { + return + } + + operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { + return r.URL.RequestURI() + }) + t.server.HTTP.Handle("/api/prom/push", middleware.Merge( + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) + }), + middleware.AuthenticateUser, + ).Wrap(http.HandlerFunc(t.distributor.PushHandler))) + + return + }, + }, + + Ingester: module{ + deps: []moduleName{Server}, + init: func(t *Tempo, cfg *config) (err error) { + cfg.ingesterConfig.LifecyclerConfig.ListenPort = &cfg.serverConfig.GRPCListenPort + t.ingester, err = ingester.New(cfg.ingesterConfig) + if err != nil { + return + } + + logproto.RegisterPusherServer(t.server.GRPC, t.ingester) + logproto.RegisterQuerierServer(t.server.GRPC, t.ingester) + grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) + t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler)) return - } - for _, dep := range mods[mod].deps { - run(dep) - } - if err := mods[mod].init(); err != nil { - log.Fatalf("Error initializing %s: %v", mod, err) - } - inited[mod] = struct{}{} + }, + stop: func(t *Tempo) { + t.ingester.Shutdown() + }, + }, + + Querier: module{ + deps: []moduleName{Ring, Server}, + init: func(t *Tempo, cfg *config) (err error) { + t.querier, err = querier.New(cfg.querierConfig, t.ring) + if err != nil { + return + } + + operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { + return r.URL.RequestURI() + }) + httpMiddleware := middleware.Merge( + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) + }), + middleware.AuthenticateUser, + ) + t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler))) + t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) + t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) + return + }, + }, + + All: module{ + deps: []moduleName{Querier, Ingester, Distributor}, + init: func(t *Tempo, cfg *config) (err error) { + return + }, + }, +} + +var ( + cfg config + tempo Tempo + inited = map[moduleName]struct{}{} +) + +func initModule(m moduleName) { + if _, ok := inited[m]; ok { + return + } + + for _, dep := range modules[m].deps { + initModule(dep) + } + + if err := modules[m].init(&tempo, &cfg); err != nil { + log.Fatalf("Error initializing %s: %v", m, err) } - run(flagset.Arg(0)) + inited[m] = struct{}{} +} + +func main() { + flagset := flag.NewFlagSet("", flag.ExitOnError) + target := All + flagset.Var(&target, "target", "target module (default All)") + flagext.RegisterConfigs(flagset, &cfg) + flagset.Parse(os.Args[1:]) + + util.InitLogger(&cfg.serverConfig) - server.Run() + initModule(target) + tempo.server.Run() + tempo.server.Shutdown() }