diff --git a/.gitignore b/.gitignore index f68efbc2749de84ed1c52c69ee4f2153d8e8d992..b485f6fdfdb4c30908f933a25ed8b23310656871 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,3 @@ cmd/loki/loki cmd/promtail/promtail /loki /promtail -.idea/ \ No newline at end of file diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 7ad3bb7d27afd858e790556f94b04d26b5bd6ffd..bcd5dd8cc07e816504acc798dfc46fe60c2c5023 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -54,6 +54,7 @@ type Config struct { BackoffConfig util.BackoffConfig `yaml:"backoff_config"` ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"` + Timeout time.Duration `yaml:"timeout"` } // RegisterFlags registers flags. @@ -65,6 +66,7 @@ func (c *Config) RegisterFlags(flags *flag.FlagSet) { flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.") flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.") flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.") + flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request") } // Client for pushing logs in snappy-compressed protos over HTTP. @@ -157,6 +159,7 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { start := time.Now() status, err = c.send(ctx, buf) requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds()) + if err == nil { return } @@ -166,7 +169,7 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) { break } - level.Warn(c.logger).Log("msg", "error sending batch", "status", status, "error", err) + level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) backoff.Wait() } @@ -191,6 +194,8 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) { } func (c *Client) send(ctx context.Context, buf []byte) (int, error) { + ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) if err != nil { return -1, err diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index c668a7ea1004df15b703fccad12127d815bc68ce..304502940fb45c32917e76c96a9a8cd8c2afe1c9 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -18,16 +18,12 @@ import ( "github.com/prometheus/common/model" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" - "github.com/weaveworks/common/server" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/parser" "github.com/grafana/loki/pkg/promtail/api" - "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/config" - "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/scrape" - "github.com/grafana/loki/pkg/promtail/targets" ) func TestPromtail(t *testing.T) { @@ -350,17 +346,17 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) t.Fatal("Failed to parse client URL") } - clientConfig := client.Config{ - URL: clientURL, - BatchWait: 10 * time.Millisecond, - BatchSize: 10 * 1024, - ExternalLabels: nil, - } + cfg := config.Config{} + // Init everything with default values. + flagext.RegisterFlags(&cfg) - positionsConfig := positions.Config{ - SyncPeriod: 100 * time.Millisecond, - PositionsFile: positionsFileName, - } + // Override some of those defaults + cfg.ClientConfig.URL = clientURL + cfg.ClientConfig.BatchWait = 10 * time.Millisecond + cfg.ClientConfig.BatchSize = 10 * 1024 + + cfg.PositionsConfig.SyncPeriod = 100 * time.Millisecond + cfg.PositionsConfig.PositionsFile = positionsFileName targetGroup := targetgroup.Group{ Targets: []model.LabelSet{{ @@ -385,21 +381,11 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) RelabelConfigs: nil, ServiceDiscoveryConfig: serviceConfig, } + cfg.ScrapeConfig = append(cfg.ScrapeConfig, scrapeConfig) - targetConfig := targets.Config{ - SyncPeriod: 10 * time.Millisecond, - } - - return config.Config{ - ServerConfig: server.Config{}, - ClientConfig: clientConfig, - PositionsConfig: positionsConfig, - ScrapeConfig: []scrape.Config{ - scrapeConfig, - }, - TargetConfig: targetConfig, - } + cfg.TargetConfig.SyncPeriod = 10 * time.Millisecond + return cfg } func initRandom() {