From da18cb343a9d5343dc846692df0a8da875c9573a Mon Sep 17 00:00:00 2001
From: Tom Wilkie <tom.wilkie@gmail.com>
Date: Thu, 22 Nov 2018 15:51:30 +0000
Subject: [PATCH] Add option to disable auth when running locally.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
---
 .gitignore             |   2 +-
 cmd/tempo/main.go      | 287 ++++-------------------------------------
 docs/local.yaml        |   2 +
 pkg/tempo/fake_auth.go |  42 ++++++
 pkg/tempo/modules.go   | 184 ++++++++++++++++++++++++++
 pkg/tempo/tempo.go     | 138 ++++++++++++++++++++
 6 files changed, 389 insertions(+), 266 deletions(-)
 create mode 100644 pkg/tempo/fake_auth.go
 create mode 100644 pkg/tempo/modules.go
 create mode 100644 pkg/tempo/tempo.go

diff --git a/.gitignore b/.gitignore
index 4f708e8b..1cdb68f6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,4 @@ cmd/promtail/promtail
 mixin/vendor/
 pkg/logproto/logproto.pb.go
 pkg/parser/labels.go
-tempo
+/tempo
diff --git a/cmd/tempo/main.go b/cmd/tempo/main.go
index 7fa1cef9..8242f196 100644
--- a/cmd/tempo/main.go
+++ b/cmd/tempo/main.go
@@ -2,261 +2,47 @@ package main
 
 import (
 	"flag"
-	"fmt"
 	"io/ioutil"
-	"net/http"
 	"os"
-	"strings"
 
 	"github.com/go-kit/kit/log/level"
-	"github.com/opentracing-contrib/go-stdlib/nethttp"
-	opentracing "github.com/opentracing/opentracing-go"
+	"github.com/grafana/tempo/pkg/tempo"
 	"github.com/pkg/errors"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/health/grpc_health_v1"
 	"gopkg.in/yaml.v2"
 
-	"github.com/cortexproject/cortex/pkg/ring"
 	"github.com/cortexproject/cortex/pkg/util"
 	"github.com/cortexproject/cortex/pkg/util/flagext"
-	"github.com/weaveworks/common/middleware"
-	"github.com/weaveworks/common/server"
-
-	"github.com/grafana/tempo/pkg/distributor"
-	"github.com/grafana/tempo/pkg/ingester"
-	"github.com/grafana/tempo/pkg/ingester/client"
-	"github.com/grafana/tempo/pkg/logproto"
-	"github.com/grafana/tempo/pkg/querier"
-)
-
-type config struct {
-	Server         server.Config      `yaml:"server,omitempty"`
-	Distributor    distributor.Config `yaml:"distributor,omitempty"`
-	Querier        querier.Config     `yaml:"querier,omitempty"`
-	IngesterClient client.Config      `yaml:"ingester_client,omitempty"`
-	Ingester       ingester.Config    `yaml:"ingester,omitempty"`
-}
-
-func (c *config) RegisterFlags(f *flag.FlagSet) {
-	c.Server.MetricsNamespace = "tempo"
-	c.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
-		middleware.ServerUserHeaderInterceptor,
-	}
-	c.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
-		middleware.StreamServerUserHeaderInterceptor,
-	}
-
-	c.Server.RegisterFlags(f)
-	c.Distributor.RegisterFlags(f)
-	c.Querier.RegisterFlags(f)
-	c.IngesterClient.RegisterFlags(f)
-	c.Ingester.RegisterFlags(f)
-}
-
-type Tempo struct {
-	server      *server.Server
-	ring        *ring.Ring
-	distributor *distributor.Distributor
-	ingester    *ingester.Ingester
-	querier     *querier.Querier
-}
-
-type moduleName int
-
-const (
-	Ring moduleName = iota
-	Server
-	Distributor
-	Ingester
-	Querier
-	All
-)
-
-func (m moduleName) String() string {
-	switch m {
-	case Ring:
-		return "ring"
-	case Server:
-		return "server"
-	case Distributor:
-		return "distributor"
-	case Ingester:
-		return "ingester"
-	case Querier:
-		return "querier"
-	case All:
-		return "all"
-	default:
-		panic(fmt.Sprintf("unknow module name: %d", m))
-	}
-}
-
-func (m *moduleName) Set(s string) error {
-	switch strings.ToLower(s) {
-	case "ring":
-		*m = Ring
-		return nil
-	case "server":
-		*m = Server
-		return nil
-	case "distributor":
-		*m = Distributor
-		return nil
-	case "ingester":
-		*m = Ingester
-		return nil
-	case "querier":
-		*m = Querier
-		return nil
-	case "all":
-		*m = All
-		return nil
-	default:
-		return fmt.Errorf("unrecognised module name: %s", s)
-	}
-}
-
-type module struct {
-	deps []moduleName
-	init func(t *Tempo, cfg *config) error
-	stop func(t *Tempo)
-}
-
-var modules = map[moduleName]module{
-	Server: module{
-		init: func(t *Tempo, cfg *config) (err error) {
-			t.server, err = server.New(cfg.Server)
-			return
-		},
-	},
-
-	Ring: module{
-		deps: []moduleName{Server},
-		init: func(t *Tempo, cfg *config) (err error) {
-			t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig)
-			if err != nil {
-				return
-			}
-			t.server.HTTP.Handle("/ring", t.ring)
-			return
-		},
-	},
-
-	Distributor: module{
-		deps: []moduleName{Ring, Server},
-		init: func(t *Tempo, cfg *config) (err error) {
-			t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.ring)
-			if err != nil {
-				return
-			}
-
-			operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
-				return r.URL.RequestURI()
-			})
-			t.server.HTTP.Handle("/api/prom/push", middleware.Merge(
-				middleware.Func(func(handler http.Handler) http.Handler {
-					return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
-				}),
-				middleware.AuthenticateUser,
-			).Wrap(http.HandlerFunc(t.distributor.PushHandler)))
-
-			return
-		},
-	},
-
-	Ingester: module{
-		deps: []moduleName{Server},
-		init: func(t *Tempo, cfg *config) (err error) {
-			cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
-			t.ingester, err = ingester.New(cfg.Ingester)
-			if err != nil {
-				return
-			}
-
-			logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
-			logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
-			grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
-			t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
-			return
-		},
-		stop: func(t *Tempo) {
-			t.ingester.Shutdown()
-		},
-	},
-
-	Querier: module{
-		deps: []moduleName{Ring, Server},
-		init: func(t *Tempo, cfg *config) (err error) {
-			t.querier, err = querier.New(cfg.Querier, cfg.IngesterClient, t.ring)
-			if err != nil {
-				return
-			}
-
-			operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
-				return r.URL.RequestURI()
-			})
-			httpMiddleware := middleware.Merge(
-				middleware.Func(func(handler http.Handler) http.Handler {
-					return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
-				}),
-				middleware.AuthenticateUser,
-			)
-			t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler)))
-			t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
-			t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
-			return
-		},
-	},
-
-	All: module{
-		deps: []moduleName{Querier, Ingester, Distributor},
-		init: func(t *Tempo, cfg *config) (err error) {
-			return
-		},
-	},
-}
-
-var (
-	cfg    config
-	tempo  Tempo
-	inited = map[moduleName]struct{}{}
 )
 
-func initModule(m moduleName) error {
-	if _, ok := inited[m]; ok {
-		return nil
-	}
-
-	for _, dep := range modules[m].deps {
-		initModule(dep)
-	}
-
-	level.Info(util.Logger).Log("msg", "initialising", "module", m)
-	if err := modules[m].init(&tempo, &cfg); err != nil {
-		return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m))
-	}
+func main() {
+	var (
+		cfg        tempo.Config
+		configFile = ""
+	)
+	flag.StringVar(&configFile, "config.file", "", "Configuration file to load.")
+	flagext.RegisterFlags(&cfg)
+	flag.Parse()
 
-	inited[m] = struct{}{}
-	return nil
-}
+	util.InitLogger(&cfg.Server)
 
-func stopModule(m moduleName) {
-	if _, ok := inited[m]; !ok {
-		return
+	if configFile != "" {
+		if err := readConfig(configFile, &cfg); err != nil {
+			level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err)
+			os.Exit(1)
+		}
 	}
-	delete(inited, m)
 
-	for _, dep := range modules[m].deps {
-		stopModule(dep)
+	t, err := tempo.New(cfg)
+	if err != nil {
+		level.Error(util.Logger).Log("msg", "error initialising module", "err", err)
+		os.Exit(1)
 	}
 
-	if modules[m].stop != nil {
-		level.Info(util.Logger).Log("msg", "stopping", "module", m)
-		modules[m].stop(&tempo)
-	}
+	t.Run()
+	t.Stop()
 }
 
-func readConfig(filename string) error {
+func readConfig(filename string, cfg *tempo.Config) error {
 	f, err := os.Open(filename)
 	if err != nil {
 		return errors.Wrap(err, "error opening config file")
@@ -273,32 +59,3 @@ func readConfig(filename string) error {
 	}
 	return nil
 }
-
-func main() {
-	var (
-		target     = All
-		configFile = ""
-	)
-	flag.Var(&target, "target", "target module (default All)")
-	flag.StringVar(&configFile, "config.file", "", "Configuration file to load.")
-	flagext.RegisterFlags(&cfg)
-	flag.Parse()
-
-	util.InitLogger(&cfg.Server)
-
-	if configFile != "" {
-		if err := readConfig(configFile); err != nil {
-			level.Error(util.Logger).Log("msg", "error loading config", "filename", configFile, "err", err)
-			os.Exit(1)
-		}
-	}
-
-	if err := initModule(target); err != nil {
-		level.Error(util.Logger).Log("msg", "error initialising module", "err", err)
-		os.Exit(1)
-	}
-
-	tempo.server.Run()
-	tempo.server.Shutdown()
-	stopModule(target)
-}
diff --git a/docs/local.yaml b/docs/local.yaml
index 5e0fe018..93a6f6fc 100644
--- a/docs/local.yaml
+++ b/docs/local.yaml
@@ -1,3 +1,5 @@
+auth_enabled: false
+
 ingester:
   lifecycler:
     ring:
diff --git a/pkg/tempo/fake_auth.go b/pkg/tempo/fake_auth.go
new file mode 100644
index 00000000..849f513b
--- /dev/null
+++ b/pkg/tempo/fake_auth.go
@@ -0,0 +1,42 @@
+package tempo
+
+import (
+	"context"
+	"net/http"
+
+	"github.com/weaveworks/common/middleware"
+	"github.com/weaveworks/common/user"
+	"google.golang.org/grpc"
+)
+
+// Fake auth middlewares just injects a fake userID, so the rest of the code
+// can continue to be multitenant.
+
+var fakeHTTPAuthMiddleware = middleware.Func(func(next http.Handler) http.Handler {
+	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		ctx := user.InjectOrgID(r.Context(), "fake")
+		next.ServeHTTP(w, r.WithContext(ctx))
+	})
+})
+
+var fakeGRPCAuthUniaryMiddleware = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+	ctx = user.InjectOrgID(ctx, "fake")
+	return handler(ctx, req)
+}
+
+var fakeGRPCAuthStreamMiddleware = func(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+	ctx := user.InjectOrgID(ss.Context(), "fake")
+	return handler(srv, serverStream{
+		ctx:          ctx,
+		ServerStream: ss,
+	})
+}
+
+type serverStream struct {
+	ctx context.Context
+	grpc.ServerStream
+}
+
+func (ss serverStream) Context() context.Context {
+	return ss.ctx
+}
diff --git a/pkg/tempo/modules.go b/pkg/tempo/modules.go
new file mode 100644
index 00000000..ddfd383f
--- /dev/null
+++ b/pkg/tempo/modules.go
@@ -0,0 +1,184 @@
+package tempo
+
+import (
+	"fmt"
+	"net/http"
+	"strings"
+
+	"github.com/opentracing-contrib/go-stdlib/nethttp"
+	opentracing "github.com/opentracing/opentracing-go"
+	"google.golang.org/grpc/health/grpc_health_v1"
+
+	"github.com/cortexproject/cortex/pkg/ring"
+	"github.com/weaveworks/common/middleware"
+	"github.com/weaveworks/common/server"
+
+	"github.com/grafana/tempo/pkg/distributor"
+	"github.com/grafana/tempo/pkg/ingester"
+	"github.com/grafana/tempo/pkg/logproto"
+	"github.com/grafana/tempo/pkg/querier"
+)
+
+type moduleName int
+
+const (
+	Ring moduleName = iota
+	Server
+	Distributor
+	Ingester
+	Querier
+	All
+)
+
+func (m moduleName) String() string {
+	switch m {
+	case Ring:
+		return "ring"
+	case Server:
+		return "server"
+	case Distributor:
+		return "distributor"
+	case Ingester:
+		return "ingester"
+	case Querier:
+		return "querier"
+	case All:
+		return "all"
+	default:
+		panic(fmt.Sprintf("unknow module name: %d", m))
+	}
+}
+
+func (m *moduleName) Set(s string) error {
+	switch strings.ToLower(s) {
+	case "ring":
+		*m = Ring
+		return nil
+	case "server":
+		*m = Server
+		return nil
+	case "distributor":
+		*m = Distributor
+		return nil
+	case "ingester":
+		*m = Ingester
+		return nil
+	case "querier":
+		*m = Querier
+		return nil
+	case "all":
+		*m = All
+		return nil
+	default:
+		return fmt.Errorf("unrecognised module name: %s", s)
+	}
+}
+
+func (t *Tempo) initServer() (err error) {
+	t.server, err = server.New(t.cfg.Server)
+	return
+}
+
+func (t *Tempo) initRing() (err error) {
+	t.ring, err = ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig)
+	if err != nil {
+		return
+	}
+	t.server.HTTP.Handle("/ring", t.ring)
+	return
+}
+
+func (t *Tempo) initDistributor() (err error) {
+	t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring)
+	if err != nil {
+		return
+	}
+
+	operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
+		return r.URL.RequestURI()
+	})
+	t.server.HTTP.Handle("/api/prom/push", middleware.Merge(
+		middleware.Func(func(handler http.Handler) http.Handler {
+			return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
+		}),
+		t.httpAuthMiddleware,
+	).Wrap(http.HandlerFunc(t.distributor.PushHandler)))
+
+	return
+}
+
+func (t *Tempo) initQuerier() (err error) {
+	t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring)
+	if err != nil {
+		return
+	}
+
+	operationNameFunc := nethttp.OperationNameFunc(func(r *http.Request) string {
+		return r.URL.RequestURI()
+	})
+	httpMiddleware := middleware.Merge(
+		middleware.Func(func(handler http.Handler) http.Handler {
+			return nethttp.Middleware(opentracing.GlobalTracer(), handler, operationNameFunc)
+		}),
+		t.httpAuthMiddleware,
+	)
+	t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.QueryHandler)))
+	t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
+	t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
+	return
+}
+
+func (t *Tempo) initIngester() (err error) {
+	t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort
+	t.ingester, err = ingester.New(t.cfg.Ingester)
+	if err != nil {
+		return
+	}
+
+	logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
+	logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
+	grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
+	t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
+	return
+}
+
+func (t *Tempo) stopIngester() {
+	t.ingester.Shutdown()
+}
+
+type module struct {
+	deps []moduleName
+	init func(t *Tempo) error
+	stop func(t *Tempo)
+}
+
+var modules = map[moduleName]module{
+	Server: module{
+		init: (*Tempo).initServer,
+	},
+
+	Ring: module{
+		deps: []moduleName{Server},
+		init: (*Tempo).initRing,
+	},
+
+	Distributor: module{
+		deps: []moduleName{Ring, Server},
+		init: (*Tempo).initDistributor,
+	},
+
+	Ingester: module{
+		deps: []moduleName{Server},
+		init: (*Tempo).initIngester,
+		stop: (*Tempo).stopIngester,
+	},
+
+	Querier: module{
+		deps: []moduleName{Ring, Server},
+		init: (*Tempo).initQuerier,
+	},
+
+	All: module{
+		deps: []moduleName{Querier, Ingester, Distributor},
+	},
+}
diff --git a/pkg/tempo/tempo.go b/pkg/tempo/tempo.go
new file mode 100644
index 00000000..cd62eaf4
--- /dev/null
+++ b/pkg/tempo/tempo.go
@@ -0,0 +1,138 @@
+package tempo
+
+import (
+	"flag"
+	"fmt"
+
+	"github.com/go-kit/kit/log/level"
+	"github.com/pkg/errors"
+	"google.golang.org/grpc"
+
+	"github.com/cortexproject/cortex/pkg/ring"
+	"github.com/cortexproject/cortex/pkg/util"
+	"github.com/weaveworks/common/middleware"
+	"github.com/weaveworks/common/server"
+
+	"github.com/grafana/tempo/pkg/distributor"
+	"github.com/grafana/tempo/pkg/ingester"
+	"github.com/grafana/tempo/pkg/ingester/client"
+	"github.com/grafana/tempo/pkg/querier"
+)
+
+type Config struct {
+	Target      moduleName `yaml:"target,omitempty"`
+	AuthEnabled bool       `yaml:"auth_enabled,omitempty"`
+
+	Server         server.Config      `yaml:"server,omitempty"`
+	Distributor    distributor.Config `yaml:"distributor,omitempty"`
+	Querier        querier.Config     `yaml:"querier,omitempty"`
+	IngesterClient client.Config      `yaml:"ingester_client,omitempty"`
+	Ingester       ingester.Config    `yaml:"ingester,omitempty"`
+}
+
+func (c *Config) RegisterFlags(f *flag.FlagSet) {
+	c.Server.MetricsNamespace = "tempo"
+	c.Target = All
+	f.Var(&c.Target, "target", "target module (default All)")
+	f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.")
+
+	c.Server.RegisterFlags(f)
+	c.Distributor.RegisterFlags(f)
+	c.Querier.RegisterFlags(f)
+	c.IngesterClient.RegisterFlags(f)
+	c.Ingester.RegisterFlags(f)
+}
+
+type Tempo struct {
+	cfg Config
+
+	server      *server.Server
+	ring        *ring.Ring
+	distributor *distributor.Distributor
+	ingester    *ingester.Ingester
+	querier     *querier.Querier
+
+	httpAuthMiddleware middleware.Interface
+
+	inited map[moduleName]struct{}
+}
+
+func New(cfg Config) (*Tempo, error) {
+	tempo := &Tempo{
+		cfg:    cfg,
+		inited: map[moduleName]struct{}{},
+	}
+
+	tempo.setupAuthMiddleware()
+
+	if err := tempo.init(cfg.Target); err != nil {
+		return nil, err
+	}
+
+	return tempo, nil
+}
+
+func (t *Tempo) setupAuthMiddleware() {
+	if t.cfg.AuthEnabled {
+		t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
+			middleware.ServerUserHeaderInterceptor,
+		}
+		t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
+			middleware.StreamServerUserHeaderInterceptor,
+		}
+		t.httpAuthMiddleware = middleware.AuthenticateUser
+	} else {
+		t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{
+			fakeGRPCAuthUniaryMiddleware,
+		}
+		t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{
+			fakeGRPCAuthStreamMiddleware,
+		}
+		t.httpAuthMiddleware = fakeHTTPAuthMiddleware
+	}
+}
+
+func (t *Tempo) init(m moduleName) error {
+	if _, ok := t.inited[m]; ok {
+		return nil
+	}
+
+	for _, dep := range modules[m].deps {
+		t.init(dep)
+	}
+
+	level.Info(util.Logger).Log("msg", "initialising", "module", m)
+	if modules[m].init != nil {
+		if err := modules[m].init(t); err != nil {
+			return errors.Wrap(err, fmt.Sprintf("error initialising module: %s", m))
+		}
+	}
+
+	t.inited[m] = struct{}{}
+	return nil
+}
+
+func (t *Tempo) Run() {
+	t.server.Run()
+}
+
+func (t *Tempo) Stop() {
+	t.server.Shutdown()
+	t.stop(t.cfg.Target)
+}
+
+func (t *Tempo) stop(m moduleName) {
+	if _, ok := t.inited[m]; !ok {
+		return
+	}
+	delete(t.inited, m)
+
+	for _, dep := range modules[m].deps {
+		t.stop(dep)
+	}
+
+	if modules[m].stop != nil {
+		level.Info(util.Logger).Log("msg", "stopping", "module", m)
+		modules[m].stop(t)
+	}
+}
-- 
GitLab