diff --git a/.gitignore b/.gitignore index 4f708e8b41c1464bc7343672cff54e695b0cef82..1cdb68f6f127bc32f2aee8620cf5978c11b40787 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,4 @@ cmd/promtail/promtail mixin/vendor/ pkg/logproto/logproto.pb.go pkg/parser/labels.go -tempo +/tempo diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go index 7fa1cef9c6f94194c14c802e6cdabb0b17ef80dc..8242f1964908f93b3d3951f565086529445dd8d8 100644 --- a/cmd/tempo/main.go +++ b/cmd/tempo/main.go @@ -2,261 +2,47 @@ package main import ( "flag" - "fmt" "io/ioutil" - "net/http" "os" - "strings" "github.com/go-kit/kit/log/level" - "github.com/opentracing-contrib/go-stdlib/nethttp" - opentracing "github.com/opentracing/opentracing-go" + "github.com/grafana/tempo/pkg/tempo" "github.com/pkg/errors" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v2" - "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/server" - - "github.com/grafana/tempo/pkg/distributor" - "github.com/grafana/tempo/pkg/ingester" - "github.com/grafana/tempo/pkg/ingester/client" - "github.com/grafana/tempo/pkg/logproto" - "github.com/grafana/tempo/pkg/querier" -) - -type config struct { - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` -} - -func (c *config) RegisterFlags(f *flag.FlagSet) { - c.Server.MetricsNamespace = "tempo" - c.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ - middleware.ServerUserHeaderInterceptor, - } - c.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ - middleware.StreamServerUserHeaderInterceptor, - } - - c.Server.RegisterFlags(f) - c.Distributor.RegisterFlags(f) - c.Querier.RegisterFlags(f) - c.IngesterClient.RegisterFlags(f) - c.Ingester.RegisterFlags(f) -} - -type Tempo struct { - server *server.Server - ring *ring.Ring - distributor *distributor.Distributor - ingester *ingester.Ingester - querier *querier.Querier -} - -type moduleName int - -const ( - Ring moduleName = iota - Server - Distributor - Ingester - Querier - All -) - -func (m moduleName) String() string { - switch m { - case Ring: - return "ring" - case Server: - return "server" - case Distributor: - return "distributor" - case Ingester: - return "ingester" - case Querier: - return "querier" - case All: - return "all" - default: - panic(fmt.Sprintf("unknow module name: %d", m)) - } -} - -func (m *moduleName) Set(s string) error { - switch strings.ToLower(s) { - case "ring": - *m = Ring - return nil - case "server": - *m = Server - return nil - case "distributor": - *m = Distributor - return nil - case "ingester": - *m = Ingester - return nil - case "querier": - *m = Querier - return nil - case "all": - *m = All - return nil - default: - return fmt.Errorf("unrecognised module name: %s", s) - } -} - -type module struct { - deps []moduleName - init func(t *Tempo, cfg *config) error - stop func(t *Tempo) -} - -var modules = map[moduleName]module{ - Server: module{ - init: func(t *Tempo, cfg *config) (err error) { - t.server, err = server.New(cfg.Server) - return - }, - }, - - Ring: module{ - deps: []moduleName{Server}, - init: func(t *Tempo, cfg *config) (err error) { - t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig) - if err != nil { - return - } - t.server.HTTP.Handle("/ring", t.ring) - return - }, - }, - - Distributor: module{ - deps: []moduleName{Ring, Server}, - init: func(t *Tempo, cfg *config) (err error) { - t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.ring) - if err != nil { - return - } - - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - t.server.HTTP.Handle("/api/prom/push", middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - middleware.AuthenticateUser, - ).Wrap(http.HandlerFunc(t.distributor.PushHandler))) - - return - }, - }, - - Ingester: module{ - deps: []moduleName{Server}, - init: func(t *Tempo, cfg *config) (err error) { - cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort - t.ingester, err = ingester.New(cfg.Ingester) - if err != nil { - return - } - - logproto.RegisterPusherServer(t.server.GRPC, t.ingester) - logproto.RegisterQuerierServer(t.server.GRPC, t.ingester) - grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) - t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler)) - return - }, - stop: func(t *Tempo) { - t.ingester.Shutdown() - }, - }, - - Querier: module{ - deps: []moduleName{Ring, Server}, - init: func(t *Tempo, cfg *config) (err error) { - t.querier, err = querier.New(cfg.Querier, cfg.IngesterClient, t.ring) - if err != nil { - return - } - - operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { - return r.URL.RequestURI() - }) - httpMiddleware := middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) - }), - middleware.AuthenticateUser, - ) - t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler))) - t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) - t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) - return - }, - }, - - All: module{ - deps: []moduleName{Querier, Ingester, Distributor}, - init: func(t *Tempo, cfg *config) (err error) { - return - }, - }, -} - -var ( - cfg config - tempo Tempo - inited = map[moduleName]struct{}{} ) -func initModule(m moduleName) error { - if _, ok := inited[m]; ok { - return nil - } - - for _, dep := range modules[m].deps { - initModule(dep) - } - - level.Info(util.Logger).Log("msg", "initialising", "module", m) - if err := modules[m].init(&tempo, &cfg); err != nil { - return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m)) - } +func main() { + var ( + cfg tempo.Config + configFile = "" + ) + flag.StringVar(&configFile, "config.file", "", "Configuration file to load.") + flagext.RegisterFlags(&cfg) + flag.Parse() - inited[m] = struct{}{} - return nil -} + util.InitLogger(&cfg.Server) -func stopModule(m moduleName) { - if _, ok := inited[m]; !ok { - return + if configFile != "" { + if err := readConfig(configFile, &cfg); err != nil { + level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err) + os.Exit(1) + } } - delete(inited, m) - for _, dep := range modules[m].deps { - stopModule(dep) + t, err := tempo.New(cfg) + if err != nil { + level.Error(util.Logger).Log("msg", "error initialising module", "err", err) + os.Exit(1) } - if modules[m].stop != nil { - level.Info(util.Logger).Log("msg", "stopping", "module", m) - modules[m].stop(&tempo) - } + t.Run() + t.Stop() } -func readConfig(filename string) error { +func readConfig(filename string, cfg *tempo.Config) error { f, err := os.Open(filename) if err != nil { return errors.Wrap(err, "error opening config file") @@ -273,32 +59,3 @@ func readConfig(filename string) error { } return nil } - -func main() { - var ( - target = All - configFile = "" - ) - flag.Var(&target, "target", "target module (default All)") - flag.StringVar(&configFile, "config.file", "", "Configuration file to load.") - flagext.RegisterFlags(&cfg) - flag.Parse() - - util.InitLogger(&cfg.Server) - - if configFile != "" { - if err := readConfig(configFile); err != nil { - level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err) - os.Exit(1) - } - } - - if err := initModule(target); err != nil { - level.Error(util.Logger).Log("msg", "error initialising module", "err", err) - os.Exit(1) - } - - tempo.server.Run() - tempo.server.Shutdown() - stopModule(target) -} diff --git a/docs/local.yaml b/docs/local.yaml index 5e0fe0189b4f0dfb2a0f65c5bb5f1e7893eb334b..93a6f6fcd5d5caf033303da46b1b04b19cc806f0 100644 --- a/docs/local.yaml +++ b/docs/local.yaml @@ -1,3 +1,5 @@ +auth_enabled: false + ingester: lifecycler: ring: diff --git a/pkg/tempo/fake_auth.go b/pkg/tempo/fake_auth.go new file mode 100644 index 0000000000000000000000000000000000000000..849f513b59d3055272c0a574d5dc0ba246051ee8 --- /dev/null +++ b/pkg/tempo/fake_auth.go @@ -0,0 +1,42 @@ +package tempo + +import ( + "context" + "net/http" + + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" +) + +// Fake auth middlewares just injects a fake userID, so the rest of the code +// can continue to be multitenant. + +var fakeHTTPAuthMiddleware = middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := user.InjectOrgID(r.Context(), "fake") + next.ServeHTTP(w, r.WithContext(ctx)) + }) +}) + +var fakeGRPCAuthUniaryMiddleware = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + ctx = user.InjectOrgID(ctx, "fake") + return handler(ctx, req) +} + +var fakeGRPCAuthStreamMiddleware = func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := user.InjectOrgID(ss.Context(), "fake") + return handler(srv, serverStream{ + ctx: ctx, + ServerStream: ss, + }) +} + +type serverStream struct { + ctx context.Context + grpc.ServerStream +} + +func (ss serverStream) Context() context.Context { + return ss.ctx +} diff --git a/pkg/tempo/modules.go b/pkg/tempo/modules.go new file mode 100644 index 0000000000000000000000000000000000000000..ddfd383ffcc717703f6a7e6e90273bdd6bd6b398 --- /dev/null +++ b/pkg/tempo/modules.go @@ -0,0 +1,184 @@ +package tempo + +import ( + "fmt" + "net/http" + "strings" + + "github.com/opentracing-contrib/go-stdlib/nethttp" + opentracing "github.com/opentracing/opentracing-go" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + + "github.com/grafana/tempo/pkg/distributor" + "github.com/grafana/tempo/pkg/ingester" + "github.com/grafana/tempo/pkg/logproto" + "github.com/grafana/tempo/pkg/querier" +) + +type moduleName int + +const ( + Ring moduleName = iota + Server + Distributor + Ingester + Querier + All +) + +func (m moduleName) String() string { + switch m { + case Ring: + return "ring" + case Server: + return "server" + case Distributor: + return "distributor" + case Ingester: + return "ingester" + case Querier: + return "querier" + case All: + return "all" + default: + panic(fmt.Sprintf("unknow module name: %d", m)) + } +} + +func (m *moduleName) Set(s string) error { + switch strings.ToLower(s) { + case "ring": + *m = Ring + return nil + case "server": + *m = Server + return nil + case "distributor": + *m = Distributor + return nil + case "ingester": + *m = Ingester + return nil + case "querier": + *m = Querier + return nil + case "all": + *m = All + return nil + default: + return fmt.Errorf("unrecognised module name: %s", s) + } +} + +func (t *Tempo) initServer() (err error) { + t.server, err = server.New(t.cfg.Server) + return +} + +func (t *Tempo) initRing() (err error) { + t.ring, err = ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig) + if err != nil { + return + } + t.server.HTTP.Handle("/ring", t.ring) + return +} + +func (t *Tempo) initDistributor() (err error) { + t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring) + if err != nil { + return + } + + operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { + return r.URL.RequestURI() + }) + t.server.HTTP.Handle("/api/prom/push", middleware.Merge( + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) + }), + t.httpAuthMiddleware, + ).Wrap(http.HandlerFunc(t.distributor.PushHandler))) + + return +} + +func (t *Tempo) initQuerier() (err error) { + t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring) + if err != nil { + return + } + + operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string { + return r.URL.RequestURI() + }) + httpMiddleware := middleware.Merge( + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc) + }), + t.httpAuthMiddleware, + ) + t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler))) + t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) + t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) + return +} + +func (t *Tempo) initIngester() (err error) { + t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort + t.ingester, err = ingester.New(t.cfg.Ingester) + if err != nil { + return + } + + logproto.RegisterPusherServer(t.server.GRPC, t.ingester) + logproto.RegisterQuerierServer(t.server.GRPC, t.ingester) + grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) + t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler)) + return +} + +func (t *Tempo) stopIngester() { + t.ingester.Shutdown() +} + +type module struct { + deps []moduleName + init func(t *Tempo) error + stop func(t *Tempo) +} + +var modules = map[moduleName]module{ + Server: module{ + init: (*Tempo).initServer, + }, + + Ring: module{ + deps: []moduleName{Server}, + init: (*Tempo).initRing, + }, + + Distributor: module{ + deps: []moduleName{Ring, Server}, + init: (*Tempo).initDistributor, + }, + + Ingester: module{ + deps: []moduleName{Server}, + init: (*Tempo).initIngester, + stop: (*Tempo).stopIngester, + }, + + Querier: module{ + deps: []moduleName{Ring, Server}, + init: (*Tempo).initQuerier, + }, + + All: module{ + deps: []moduleName{Querier, Ingester, Distributor}, + }, +} diff --git a/pkg/tempo/tempo.go b/pkg/tempo/tempo.go new file mode 100644 index 0000000000000000000000000000000000000000..cd62eaf46f864aefa3d8b4e5c83fc73ee2bbeee1 --- /dev/null +++ b/pkg/tempo/tempo.go @@ -0,0 +1,138 @@ +package tempo + +import ( + "flag" + "fmt" + + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "google.golang.org/grpc" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/server" + + "github.com/grafana/tempo/pkg/distributor" + "github.com/grafana/tempo/pkg/ingester" + "github.com/grafana/tempo/pkg/ingester/client" + "github.com/grafana/tempo/pkg/querier" +) + +type Config struct { + Target moduleName `yaml:"target,omitempty"` + AuthEnabled bool `yaml:"auth_enabled,omitempty"` + + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` +} + +func (c *Config) RegisterFlags(f *flag.FlagSet) { + c.Server.MetricsNamespace = "tempo" + c.Target = All + f.Var(&c.Target, "target", "target module (default All)") + f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") + + c.Server.RegisterFlags(f) + c.Distributor.RegisterFlags(f) + c.Querier.RegisterFlags(f) + c.IngesterClient.RegisterFlags(f) + c.Ingester.RegisterFlags(f) +} + +type Tempo struct { + cfg Config + + server *server.Server + ring *ring.Ring + distributor *distributor.Distributor + ingester *ingester.Ingester + querier *querier.Querier + + httpAuthMiddleware middleware.Interface + + inited map[moduleName]struct{} +} + +func New(cfg Config) (*Tempo, error) { + tempo := &Tempo{ + cfg: cfg, + inited: map[moduleName]struct{}{}, + } + + tempo.setupAuthMiddleware() + + if err := tempo.init(cfg.Target); err != nil { + return nil, err + } + + return tempo, nil +} + +func (t *Tempo) setupAuthMiddleware() { + if t.cfg.AuthEnabled { + t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + middleware.ServerUserHeaderInterceptor, + } + t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + middleware.StreamServerUserHeaderInterceptor, + } + t.httpAuthMiddleware = middleware.AuthenticateUser + } else { + t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{ + fakeGRPCAuthUniaryMiddleware, + } + t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{ + fakeGRPCAuthStreamMiddleware, + } + t.httpAuthMiddleware = fakeHTTPAuthMiddleware + } +} + +func (t *Tempo) init(m moduleName) error { + if _, ok := t.inited[m]; ok { + return nil + } + + for _, dep := range modules[m].deps { + t.init(dep) + } + + level.Info(util.Logger).Log("msg", "initialising", "module", m) + if modules[m].init != nil { + if err := modules[m].init(t); err != nil { + return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m)) + } + } + + t.inited[m] = struct{}{} + return nil +} + +func (t *Tempo) Run() { + t.server.Run() +} + +func (t *Tempo) Stop() { + t.server.Shutdown() + t.stop(t.cfg.Target) +} + +func (t *Tempo) stop(m moduleName) { + if _, ok := t.inited[m]; !ok { + return + } + delete(t.inited, m) + + for _, dep := range modules[m].deps { + t.stop(dep) + } + + if modules[m].stop != nil { + level.Info(util.Logger).Log("msg", "stopping", "module", m) + modules[m].stop(t) + } +}