From fad57e299e4381330423e6ec5ec505aafe9afab1 Mon Sep 17 00:00:00 2001 From: Tom Wilkie <tomwilkie@users.noreply.github.com> Date: Thu, 31 Jan 2019 19:40:24 +0100 Subject: [PATCH] Fix validation; allow 'metrics' without a metric name. (#268) * Update cortex to include optionally-enforce-metric-name Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> * Configure validation to allow 'metrics' without metric names. Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com> --- Gopkg.lock | 8 +- Gopkg.toml | 3 +- cmd/loki/loki-local-config.yaml | 5 +- production/helm/templates/loki/configmap.yaml | 3 + production/ksonnet/loki/config.libsonnet | 4 + .../pkg/chunk/aws/dynamodb_storage_client.go | 18 +- .../pkg/chunk/aws/dynamodb_table_client.go | 48 +++- .../pkg/chunk/aws/metrics_autoscaling.go | 269 ++++++++++++------ .../cortex/pkg/chunk/aws/s3_storage_client.go | 3 +- .../pkg/chunk/cassandra/storage_client.go | 3 +- .../cortexproject/cortex/pkg/chunk/chunk.go | 26 +- .../cortex/pkg/chunk/chunk_store.go | 5 +- .../cortex/pkg/chunk/chunk_store_utils.go | 2 +- .../pkg/chunk/gcp/bigtable_object_client.go | 3 +- .../cortex/pkg/chunk/gcp/gcs_object_client.go | 2 +- .../pkg/chunk/inmemory_storage_client.go | 2 +- .../pkg/chunk/local/fs_object_client.go | 3 +- .../cortex/pkg/chunk/schema_config.go | 52 +++- .../cortex/pkg/chunk/series_store.go | 5 +- .../cortex/pkg/chunk/table_client.go | 24 +- .../cortex/pkg/chunk/table_manager.go | 36 ++- .../cortex/pkg/chunk/testutils/testutils.go | 6 +- .../cortex/pkg/ingester/index/index.go | 6 +- .../cortex/pkg/util/validation/limits.go | 2 + .../cortex/pkg/util/validation/override.go | 7 + .../cortex/pkg/util/validation/validate.go | 12 +- 26 files changed, 394 insertions(+), 163 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 50fbe3b3..e0899145 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -163,8 +163,8 @@ revision = "3a0bb77429bd3a61596f5e8a3172445844342120" [[projects]] - branch = "master" - digest = "1:e7525991325b400ecf4f6c6383e965b12801b87e2f687afda23f57f091f9fcc0" + branch = "optionally-enforce-metric-name" + digest = "1:2cced64caeee7972668c48e0a80e98f09489bd3a7a61c470fd0ca36cb5e24344" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -191,7 +191,8 @@ "pkg/util/wire", ] pruneopts = "UT" - revision = "4c9e2025ab6733e0f42187b64d2522d558f9f31c" + revision = "16a08e037bce5343c5692aa1015e76b172ddf917" + source = "https://github.com/grafana/cortex.git" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" @@ -1344,6 +1345,7 @@ "github.com/prometheus/prometheus/relabel", "github.com/stretchr/testify/assert", "github.com/stretchr/testify/require", + "github.com/weaveworks/common/httpgrpc", "github.com/weaveworks/common/middleware", "github.com/weaveworks/common/server", "github.com/weaveworks/common/user", diff --git a/Gopkg.toml b/Gopkg.toml index b46c93b4..0a5128f2 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -26,7 +26,8 @@ [[constraint]] name = "github.com/cortexproject/cortex" - branch = "master" + branch = "optionally-enforce-metric-name" + source = "https://github.com/grafana/cortex.git" [[constraint]] name = "github.com/weaveworks/common" diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 1092dc07..ad31034e 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -25,4 +25,7 @@ storage_config: directory: /tmp/loki/index filesystem: - directory: /tmp/loki/chunks \ No newline at end of file + directory: /tmp/loki/chunks + +limits_config: + enforce_metric_name: false \ No newline at end of file diff --git a/production/helm/templates/loki/configmap.yaml b/production/helm/templates/loki/configmap.yaml index 12718cee..f7eba41f 100644 --- a/production/helm/templates/loki/configmap.yaml +++ b/production/helm/templates/loki/configmap.yaml @@ -14,6 +14,9 @@ data: server: http_listen_port: {{ .Values.loki.service.port }} + limits_config: + enforce_metric_name: false + ingester: lifecycler: ring: diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index e968239b..081f8c11 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -29,6 +29,10 @@ grpc_server_max_recv_msg_size: 1024 * 1024 * 64, }, + limits_config: { + enforce_metric_name: false, + }, + ingester: { lifecycler: { ring: { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index 05485e54..77f32106 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -286,9 +286,6 @@ func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.I } func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) - defer sp.Finish() - input := &dynamodb.QueryInput{ TableName: aws.String(query.TableName), KeyConditions: map[string]*dynamodb.Condition{ @@ -337,7 +334,7 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery for page := request; page != nil; page = page.NextPage() { pageCount++ - response, err := a.queryPage(ctx, input, page) + response, err := a.queryPage(ctx, input, page, query.HashValue, pageCount) if err != nil { return err } @@ -355,7 +352,7 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery return nil } -func (a dynamoDBStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) { +func (a dynamoDBStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest, hashValue string, pageCount int) (*dynamoDBReadResponse, error) { backoff := util.NewBackoff(ctx, a.cfg.backoffConfig) defer func() { dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) @@ -363,7 +360,13 @@ func (a dynamoDBStorageClient) queryPage(ctx context.Context, input *dynamodb.Qu var err error for backoff.Ongoing() { - err = instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(_ context.Context) error { + err = instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { + if sp := ot.SpanFromContext(innerCtx); sp != nil { + sp.SetTag("tableName", aws.StringValue(input.TableName)) + sp.SetTag("hashValue", hashValue) + sp.SetTag("page", pageCount) + sp.SetTag("retry", backoff.NumRetries()) + } return page.Send() }) @@ -637,8 +640,7 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu ) for i := range chunks { - // Encode the chunk first - checksum is calculated as a side effect. - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go index bbb2d42b..82da393c 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go @@ -230,6 +230,9 @@ func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc if out.Table.TableStatus != nil { isActive = (*out.Table.TableStatus == dynamodb.TableStatusActive) } + if out.Table.BillingModeSummary != nil { + desc.UseOnDemandIOMode = *out.Table.BillingModeSummary.BillingMode == dynamodb.BillingModePayPerRequest + } tableARN = out.Table.TableArn } return err @@ -268,18 +271,53 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch return err } } - - if current.ProvisionedRead != expected.ProvisionedRead || current.ProvisionedWrite != expected.ProvisionedWrite { + level.Debug(util.Logger).Log("msg", "Updating Table", + "expectedWrite", expected.ProvisionedWrite, + "currentWrite", current.ProvisionedWrite, + "expectedRead", expected.ProvisionedRead, + "currentRead", current.ProvisionedRead, + "expectedOnDemandMode", expected.UseOnDemandIOMode, + "currentOnDemandMode", current.UseOnDemandIOMode) + if (current.ProvisionedRead != expected.ProvisionedRead || + current.ProvisionedWrite != expected.ProvisionedWrite) && + !expected.UseOnDemandIOMode { level.Info(util.Logger).Log("msg", "updating provisioned throughput on table", "table", expected.Name, "old_read", current.ProvisionedRead, "old_write", current.ProvisionedWrite, "new_read", expected.ProvisionedRead, "new_write", expected.ProvisionedWrite) if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - _, err := d.DynamoDB.UpdateTableWithContext(ctx, &dynamodb.UpdateTableInput{ - TableName: aws.String(expected.Name), + var dynamoBillingMode string + updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(expected.ProvisionedRead), WriteCapacityUnits: aws.Int64(expected.ProvisionedWrite), }, - }) + } + // we need this to be a separate check for the billing mode, as aws returns + // an error if we set a table to the billing mode it is currently on. + if current.UseOnDemandIOMode != expected.UseOnDemandIOMode { + dynamoBillingMode = dynamodb.BillingModeProvisioned + level.Info(util.Logger).Log("msg", "updating billing mode on table", "table", expected.Name, "old_mode", current.UseOnDemandIOMode, "new_mode", expected.UseOnDemandIOMode) + updateTableInput.BillingMode = aws.String(dynamoBillingMode) + } + + _, err := d.DynamoDB.UpdateTableWithContext(ctx, updateTableInput) + return err + }) + }); err != nil { + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable") + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { + level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) + } else { + return err + } + } + } else if expected.UseOnDemandIOMode && current.UseOnDemandIOMode != expected.UseOnDemandIOMode { + // moved the enabling of OnDemand mode to it's own block to reduce complexities & interactions with the various + // settings used in provisioned mode. Unfortunately the boilerplate wrappers for retry and tracking needed to be copied. + if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + level.Info(util.Logger).Log("msg", "updating billing mode on table", "table", expected.Name, "old_mode", current.UseOnDemandIOMode, "new_mode", expected.UseOnDemandIOMode) + updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), BillingMode: aws.String(dynamodb.BillingModePayPerRequest)} + _, err := d.DynamoDB.UpdateTableWithContext(ctx, updateTableInput) return err }) }); err != nil { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/metrics_autoscaling.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/metrics_autoscaling.go index e4385a49..153bf1a3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/metrics_autoscaling.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/metrics_autoscaling.go @@ -33,6 +33,10 @@ const ( // fetch write capacity usage per DynamoDB table // use the rate over 15 minutes so we take a broad average defaultUsageQuery = `sum(rate(cortex_dynamo_consumed_capacity_total{operation="DynamoDB.BatchWriteItem"}[15m])) by (table) > 0` + // use the read rate over 1hr so we take a broad average + defaultReadUsageQuery = `sum(rate(cortex_dynamo_consumed_capacity_total{operation="DynamoDB.QueryPages"}[1h])) by (table) > 0` + // fetch read error rate per DynamoDB table + defaultReadErrorQuery = `sum(increase(cortex_dynamo_failures_total{operation="DynamoDB.QueryPages",error="ProvisionedThroughputExceededException"}[1m])) by (table) > 0` ) // MetricsAutoScalingConfig holds parameters to configure how it works @@ -43,6 +47,8 @@ type MetricsAutoScalingConfig struct { QueueLengthQuery string // Promql query to fetch ingester queue length ErrorRateQuery string // Promql query to fetch error rates per table UsageQuery string // Promql query to fetch write capacity usage per table + ReadUsageQuery string // Promql query to fetch read usage per table + ReadErrorQuery string // Promql query to fetch read errors per table } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -53,16 +59,21 @@ func (cfg *MetricsAutoScalingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.QueueLengthQuery, "metrics.queue-length-query", defaultQueueLenQuery, "query to fetch ingester queue length") f.StringVar(&cfg.ErrorRateQuery, "metrics.error-rate-query", defaultErrorRateQuery, "query to fetch error rates per table") f.StringVar(&cfg.UsageQuery, "metrics.usage-query", defaultUsageQuery, "query to fetch write capacity usage per table") + f.StringVar(&cfg.ReadUsageQuery, "metrics.read-usage-query", defaultReadUsageQuery, "query to fetch read capacity usage per table") + f.StringVar(&cfg.ReadErrorQuery, "metrics.read-error-query", defaultReadErrorQuery, "query to fetch read errors per table") } type metricsData struct { - cfg MetricsAutoScalingConfig - promAPI promV1.API - promLastQuery time.Time - tableLastUpdated map[string]time.Time - queueLengths []float64 - errorRates map[string]float64 - usageRates map[string]float64 + cfg MetricsAutoScalingConfig + promAPI promV1.API + promLastQuery time.Time + tableLastUpdated map[string]time.Time + tableReadLastUpdated map[string]time.Time + queueLengths []float64 + errorRates map[string]float64 + usageRates map[string]float64 + usageReadRates map[string]float64 + readErrorRates map[string]float64 } func newMetrics(cfg DynamoDBConfig) (*metricsData, error) { @@ -71,9 +82,10 @@ func newMetrics(cfg DynamoDBConfig) (*metricsData, error) { return nil, err } return &metricsData{ - promAPI: promV1.NewAPI(client), - cfg: cfg.Metrics, - tableLastUpdated: make(map[string]time.Time), + promAPI: promV1.NewAPI(client), + cfg: cfg.Metrics, + tableLastUpdated: make(map[string]time.Time), + tableReadLastUpdated: make(map[string]time.Time), }, nil } @@ -86,106 +98,183 @@ func (m *metricsData) DescribeTable(ctx context.Context, desc *chunk.TableDesc) } func (m *metricsData) UpdateTable(ctx context.Context, current chunk.TableDesc, expected *chunk.TableDesc) error { - // If we don't take explicit action, return the current provision as the expected provision - expected.ProvisionedWrite = current.ProvisionedWrite - if !expected.WriteScale.Enabled { - return nil - } if err := m.update(ctx); err != nil { return err } - errorRate := m.errorRates[expected.Name] - usageRate := m.usageRates[expected.Name] - - level.Info(util.Logger).Log("msg", "checking metrics", "table", current.Name, "queueLengths", fmt.Sprint(m.queueLengths), "errorRate", errorRate, "usageRate", usageRate) - - switch { - case errorRate < errorFractionScaledown*float64(current.ProvisionedWrite) && - m.queueLengths[2] < float64(m.cfg.TargetQueueLen)*targetScaledown: - // No big queue, low errors -> scale down - m.scaleDownWrite(current, expected, m.computeScaleDown(current, *expected), "metrics scale-down") - case errorRate == 0 && - m.queueLengths[2] < m.queueLengths[1] && m.queueLengths[1] < m.queueLengths[0]: - // zero errors and falling queue -> scale down to current usage - m.scaleDownWrite(current, expected, m.computeScaleDown(current, *expected), "zero errors scale-down") - case errorRate > 0 && m.queueLengths[2] > float64(m.cfg.TargetQueueLen)*targetMax: - // Too big queue, some errors -> scale up - m.scaleUpWrite(current, expected, m.computeScaleUp(current, *expected), "metrics max queue scale-up") - case errorRate > 0 && - m.queueLengths[2] > float64(m.cfg.TargetQueueLen) && - m.queueLengths[2] > m.queueLengths[1] && m.queueLengths[1] > m.queueLengths[0]: - // Growing queue, some errors -> scale up - m.scaleUpWrite(current, expected, m.computeScaleUp(current, *expected), "metrics queue growing scale-up") + if expected.WriteScale.Enabled { + // default if no action is taken is to use the currently provisioned setting + expected.ProvisionedWrite = current.ProvisionedWrite + + errorRate := m.errorRates[expected.Name] + usageRate := m.usageRates[expected.Name] + + level.Info(util.Logger).Log("msg", "checking write metrics", "table", current.Name, "queueLengths", fmt.Sprint(m.queueLengths), "errorRate", errorRate, "usageRate", usageRate) + + switch { + case errorRate < errorFractionScaledown*float64(current.ProvisionedWrite) && + m.queueLengths[2] < float64(m.cfg.TargetQueueLen)*targetScaledown: + // No big queue, low errors -> scale down + expected.ProvisionedWrite = scaleDown(current.Name, + current.ProvisionedWrite, + expected.WriteScale.MinCapacity, + computeScaleDown(current.Name, m.usageRates, expected.WriteScale.TargetValue), + m.tableLastUpdated, + expected.WriteScale.InCooldown, + "metrics scale-down", + "write", + m.usageRates) + case errorRate == 0 && + m.queueLengths[2] < m.queueLengths[1] && m.queueLengths[1] < m.queueLengths[0]: + // zero errors and falling queue -> scale down to current usage + expected.ProvisionedWrite = scaleDown(current.Name, + current.ProvisionedWrite, + expected.WriteScale.MinCapacity, + computeScaleDown(current.Name, m.usageRates, expected.WriteScale.TargetValue), + m.tableLastUpdated, + expected.WriteScale.InCooldown, + "zero errors scale-down", + "write", + m.usageRates) + case errorRate > 0 && m.queueLengths[2] > float64(m.cfg.TargetQueueLen)*targetMax: + // Too big queue, some errors -> scale up + expected.ProvisionedWrite = scaleUp(current.Name, + current.ProvisionedWrite, + expected.WriteScale.MaxCapacity, + computeScaleUp(current.ProvisionedWrite, expected.WriteScale.MaxCapacity, m.cfg.ScaleUpFactor), + m.tableLastUpdated, + expected.WriteScale.OutCooldown, + "metrics max queue scale-up", + "write") + case errorRate > 0 && + m.queueLengths[2] > float64(m.cfg.TargetQueueLen) && + m.queueLengths[2] > m.queueLengths[1] && m.queueLengths[1] > m.queueLengths[0]: + // Growing queue, some errors -> scale up + expected.ProvisionedWrite = scaleUp(current.Name, + current.ProvisionedWrite, + expected.WriteScale.MaxCapacity, + computeScaleUp(current.ProvisionedWrite, expected.WriteScale.MaxCapacity, m.cfg.ScaleUpFactor), + m.tableLastUpdated, + expected.WriteScale.OutCooldown, + "metrics queue growing scale-up", + "write") + } } - return nil -} -func (m metricsData) computeScaleDown(current, expected chunk.TableDesc) int64 { - usageRate := m.usageRates[expected.Name] - return int64(usageRate * 100.0 / expected.WriteScale.TargetValue) + if expected.ReadScale.Enabled { + // default if no action is taken is to use the currently provisioned setting + expected.ProvisionedRead = current.ProvisionedRead + readUsageRate := m.usageReadRates[expected.Name] + readErrorRate := m.readErrorRates[expected.Name] + + level.Info(util.Logger).Log("msg", "checking read metrics", "table", current.Name, "errorRate", readErrorRate, "readUsageRate", readUsageRate) + // Read Scaling + switch { + // the table is at low/minimum capacity and it is being used -> scale up + case readUsageRate > 0 && current.ProvisionedRead < expected.ReadScale.MaxCapacity/10: + expected.ProvisionedRead = scaleUp( + current.Name, + current.ProvisionedRead, + expected.ReadScale.MaxCapacity, + computeScaleUp(current.ProvisionedRead, expected.ReadScale.MaxCapacity, m.cfg.ScaleUpFactor), + m.tableReadLastUpdated, expected.ReadScale.OutCooldown, + "table is being used. scale up", + "read") + case readErrorRate > 0 && readUsageRate > 0: + // Queries are causing read throttling on the table -> scale up + expected.ProvisionedRead = scaleUp( + current.Name, + current.ProvisionedRead, + expected.ReadScale.MaxCapacity, + computeScaleUp(current.ProvisionedRead, expected.ReadScale.MaxCapacity, m.cfg.ScaleUpFactor), + m.tableReadLastUpdated, expected.ReadScale.OutCooldown, + "table is in use and there are read throttle errors, scale up", + "read") + case readErrorRate == 0 && readUsageRate == 0: + // this table is not being used. -> scale down + expected.ProvisionedRead = scaleDown(current.Name, + current.ProvisionedRead, + expected.ReadScale.MinCapacity, + computeScaleDown(current.Name, m.usageReadRates, expected.ReadScale.TargetValue), + m.tableReadLastUpdated, + expected.ReadScale.InCooldown, + "table is not in use. scale down", "read", + nil) + } + } + + return nil } -func (m metricsData) computeScaleUp(current, expected chunk.TableDesc) int64 { - scaleUp := int64(float64(current.ProvisionedWrite) * m.cfg.ScaleUpFactor) +func computeScaleUp(currentValue, maxValue int64, scaleFactor float64) int64 { + scaleUp := int64(float64(currentValue) * scaleFactor) // Scale up minimum of 10% of max capacity, to avoid futzing around at low levels - minIncrement := expected.WriteScale.MaxCapacity / 10 - if scaleUp < current.ProvisionedWrite+minIncrement { - scaleUp = current.ProvisionedWrite + minIncrement + minIncrement := maxValue / 10 + if scaleUp < currentValue+minIncrement { + scaleUp = currentValue + minIncrement } return scaleUp } -func (m *metricsData) scaleDownWrite(current chunk.TableDesc, expected *chunk.TableDesc, newWrite int64, msg string) { - if newWrite < expected.WriteScale.MinCapacity { - newWrite = expected.WriteScale.MinCapacity +func computeScaleDown(currentName string, usageRates map[string]float64, targetValue float64) int64 { + usageRate := usageRates[currentName] + return int64(usageRate * 100.0 / targetValue) +} + +func scaleDown(tableName string, currentValue, minValue int64, newValue int64, lastUpdated map[string]time.Time, coolDown int64, msg, operation string, usageRates map[string]float64) int64 { + if newValue < minValue { + newValue = minValue } // If we're already at or below the requested value, it's not a scale-down. - if newWrite >= current.ProvisionedWrite { - return + if newValue >= currentValue { + return currentValue } - earliest := m.tableLastUpdated[current.Name].Add(time.Duration(expected.WriteScale.InCooldown) * time.Second) + + earliest := lastUpdated[tableName].Add(time.Duration(coolDown) * time.Second) if earliest.After(mtime.Now()) { - level.Info(util.Logger).Log("msg", "deferring "+msg, "table", current.Name, "till", earliest) - return + level.Info(util.Logger).Log("msg", "deferring "+msg, "table", tableName, "till", earliest, "op", operation) + return currentValue } + // Reject a change that is less than 20% - AWS rate-limits scale-downs so save // our chances until it makes a bigger difference - if newWrite > current.ProvisionedWrite*4/5 { - level.Info(util.Logger).Log("msg", "rejected de minimis "+msg, "table", current.Name, "current", current.ProvisionedWrite, "proposed", newWrite) - return - } - // Check that the ingesters seem to be doing some work - don't want to scale down - // if all our metrics are returning zero, or all the ingesters have crashed, etc - totalUsage := 0.0 - for _, u := range m.usageRates { - totalUsage += u + if newValue > currentValue*4/5 { + level.Info(util.Logger).Log("msg", "rejected de minimis "+msg, "table", tableName, "current", currentValue, "proposed", newValue, "op", operation) + return currentValue } - if totalUsage < minUsageForScaledown { - level.Info(util.Logger).Log("msg", "rejected low usage "+msg, "table", current.Name, "totalUsage", totalUsage) - return + + if usageRates != nil { + // Check that the ingesters seem to be doing some work - don't want to scale down + // if all our metrics are returning zero, or all the ingesters have crashed, etc + totalUsage := 0.0 + for _, u := range usageRates { + totalUsage += u + } + if totalUsage < minUsageForScaledown { + level.Info(util.Logger).Log("msg", "rejected low usage "+msg, "table", tableName, "totalUsage", totalUsage, "op", operation) + return currentValue + } } - level.Info(util.Logger).Log("msg", msg, "table", current.Name, "write", newWrite) - expected.ProvisionedWrite = newWrite - m.tableLastUpdated[current.Name] = mtime.Now() + level.Info(util.Logger).Log("msg", msg, "table", tableName, operation, newValue) + lastUpdated[tableName] = mtime.Now() + return newValue } -func (m *metricsData) scaleUpWrite(current chunk.TableDesc, expected *chunk.TableDesc, newWrite int64, msg string) { - if newWrite > expected.WriteScale.MaxCapacity { - newWrite = expected.WriteScale.MaxCapacity - } - earliest := m.tableLastUpdated[current.Name].Add(time.Duration(expected.WriteScale.OutCooldown) * time.Second) - if earliest.After(mtime.Now()) { - level.Info(util.Logger).Log("msg", "deferring "+msg, "table", current.Name, "till", earliest) - return +func scaleUp(tableName string, currentValue, maxValue int64, newValue int64, lastUpdated map[string]time.Time, coolDown int64, msg, operation string) int64 { + if newValue > maxValue { + newValue = maxValue } - if newWrite > current.ProvisionedWrite { - level.Info(util.Logger).Log("msg", msg, "table", current.Name, "write", newWrite) - expected.ProvisionedWrite = newWrite - m.tableLastUpdated[current.Name] = mtime.Now() + earliest := lastUpdated[tableName].Add(time.Duration(coolDown) * time.Second) + if !earliest.After(mtime.Now()) && newValue > currentValue { + level.Info(util.Logger).Log("msg", msg, "table", tableName, operation, newValue) + lastUpdated[tableName] = mtime.Now() + return newValue } + + level.Info(util.Logger).Log("msg", "deferring "+msg, "table", tableName, "till", earliest) + return currentValue } func (m *metricsData) update(ctx context.Context) error { @@ -225,6 +314,22 @@ func (m *metricsData) update(ctx context.Context) error { return err } + readUsageMatrix, err := promQuery(ctx, m.promAPI, m.cfg.ReadUsageQuery, 0, time.Second) + if err != nil { + return err + } + if m.usageReadRates, err = extractRates(readUsageMatrix); err != nil { + return err + } + + readErrorMatrix, err := promQuery(ctx, m.promAPI, m.cfg.ReadErrorQuery, 0, time.Second) + if err != nil { + return err + } + if m.readErrorRates, err = extractRates(readErrorMatrix); err != nil { + return err + } + return nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go index 439970c4..81a18254 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go @@ -95,8 +95,7 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err ) for i := range chunks { - // Encode the chunk first - checksum is calculated as a side effect. - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go index e3991206..28df0c0b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go @@ -265,8 +265,7 @@ func (b *readBatchIter) Value() []byte { // PutChunks implements chunk.ObjectClient. func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { for i := range chunks { - // Encode the chunk first - checksum is calculated as a side effect. - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return errors.WithStack(err) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go index 78de0621..1ce3b1b9 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk.go @@ -188,18 +188,14 @@ var writerPool = sync.Pool{ New: func() interface{} { return snappy.NewBufferedWriter(nil) }, } -// Encode writes the chunk out to a big write buffer, then calculates the checksum. -func (c *Chunk) Encode() ([]byte, error) { - if c.encoded != nil { - return c.encoded, nil - } - +// Encode writes the chunk into a buffer, and calculates the checksum. +func (c *Chunk) Encode() error { var buf bytes.Buffer // Write 4 empty bytes first - we will come back and put the len in here. metadataLenBytes := [4]byte{} if _, err := buf.Write(metadataLenBytes[:]); err != nil { - return nil, err + return err } // Encode chunk metadata into snappy-compressed buffer @@ -208,7 +204,7 @@ func (c *Chunk) Encode() ([]byte, error) { writer.Reset(&buf) json := jsoniter.ConfigFastest if err := json.NewEncoder(writer).Encode(c); err != nil { - return nil, err + return err } writer.Close() @@ -221,12 +217,12 @@ func (c *Chunk) Encode() ([]byte, error) { // Write another 4 empty bytes - we will come back and put the len in here. dataLenBytes := [4]byte{} if _, err := buf.Write(dataLenBytes[:]); err != nil { - return nil, err + return err } // And now the chunk data if err := c.Data.Marshal(&buf); err != nil { - return nil, err + return err } // Now write the data len back into the buf. @@ -237,6 +233,16 @@ func (c *Chunk) Encode() ([]byte, error) { c.encoded = buf.Bytes() c.ChecksumSet = true c.Checksum = crc32.Checksum(c.encoded, castagnoliTable) + return nil +} + +// Encoded returns the buffer created by Encoded() +func (c *Chunk) Encoded() ([]byte, error) { + if c.encoded == nil { + if err := c.Encode(); err != nil { + return nil, err + } + } return c.encoded, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index d33a509a..2790e55c 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -128,9 +128,6 @@ func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chun return err } - // Horribly, PutChunks mutates the chunk by setting its checksum. By putting - // the chunk in a slice we are in fact passing by reference, so below we - // need to make sure we pick the chunk back out the slice. chunks := []Chunk{chunk} err = c.storage.PutChunks(ctx, chunks) @@ -140,7 +137,7 @@ func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chun c.writeBackCache(ctx, chunks) - writeReqs, err := c.calculateIndexEntries(userID, from, through, chunks[0]) + writeReqs, err := c.calculateIndexEntries(userID, from, through, chunk) if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go index bfbe79f4..b61a2eaa 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go @@ -142,7 +142,7 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { keys := make([]string, 0, len(chunks)) bufs := make([][]byte, 0, len(chunks)) for i := range chunks { - encoded, err := chunks[i].Encode() + encoded, err := chunks[i].Encoded() // TODO don't fail, just log and conitnue? if err != nil { return err diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go index 7235ccc7..1dab5853 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go @@ -46,8 +46,7 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu muts := map[string][]*bigtable.Mutation{} for i := range chunks { - // Encode the chunk first - checksum is calculated as a side effect. - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go index ae438065..1f621cf6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go @@ -54,7 +54,7 @@ func (s *gcsObjectClient) Stop() { func (s *gcsObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { for _, chunk := range chunks { - buf, err := chunk.Encode() + buf, err := chunk.Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go index a397b74d..cd29bf4e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go @@ -280,7 +280,7 @@ func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error { defer m.mtx.Unlock() for i := range chunks { - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go index e21b2302..164e877b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go @@ -40,8 +40,7 @@ func (fsObjectClient) Stop() {} func (f *fsObjectClient) PutChunks(_ context.Context, chunks []chunk.Chunk) error { for i := range chunks { - // Encode the chunk first - checksum is calculated as a side effect. - buf, err := chunks[i].Encode() + buf, err := chunks[i].Encoded() if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go index f787ab2b..8ddb5460 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go @@ -3,6 +3,7 @@ package chunk import ( "flag" "fmt" + "github.com/go-kit/kit/log/level" "os" "strconv" "time" @@ -344,6 +345,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr lastTable = through.Unix() / periodSecs tablesToKeep = int64(int64(retention/time.Second) / periodSecs) now = mtime.Now().Unix() + nowWeek = now / periodSecs result = []TableDesc{} ) // If through ends on 00:00 of the day, don't include the upcoming day @@ -357,23 +359,61 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr for i := firstTable; i <= lastTable; i++ { table := TableDesc{ // Name construction needs to be consistent with chunk_store.bigBuckets - Name: cfg.Prefix + strconv.Itoa(int(i)), - ProvisionedRead: pCfg.InactiveReadThroughput, - ProvisionedWrite: pCfg.InactiveWriteThroughput, - Tags: cfg.Tags, + Name: cfg.Prefix + strconv.Itoa(int(i)), + ProvisionedRead: pCfg.InactiveReadThroughput, + ProvisionedWrite: pCfg.InactiveWriteThroughput, + UseOnDemandIOMode: pCfg.InactiveThroughputOnDemandMode, + Tags: cfg.Tags, } + level.Debug(util.Logger).Log("msg", "Expected Table", "tableName", table.Name, + "provisionedRead", table.ProvisionedRead, + "provisionedWrite", table.ProvisionedWrite, + "useOnDemandMode", table.UseOnDemandIOMode, + ) // if now is within table [start - grace, end + grace), then we need some write throughput if (i*periodSecs)-beginGraceSecs <= now && now < (i*periodSecs)+periodSecs+endGraceSecs { table.ProvisionedRead = pCfg.ProvisionedReadThroughput table.ProvisionedWrite = pCfg.ProvisionedWriteThroughput + table.UseOnDemandIOMode = pCfg.ProvisionedThroughputOnDemandMode if pCfg.WriteScale.Enabled { table.WriteScale = pCfg.WriteScale + table.UseOnDemandIOMode = false } - } else if pCfg.InactiveWriteScale.Enabled && i >= (lastTable-pCfg.InactiveWriteScaleLastN) { + + if pCfg.ReadScale.Enabled { + table.ReadScale = pCfg.ReadScale + table.UseOnDemandIOMode = false + } + level.Debug(util.Logger).Log("msg", "Table is Active", + "tableName", table.Name, + "provisionedRead", table.ProvisionedRead, + "provisionedWrite", table.ProvisionedWrite, + "useOnDemandMode", table.UseOnDemandIOMode, + "useWriteAutoScale", table.WriteScale.Enabled, + "useReadAutoScale", table.ReadScale.Enabled) + + } else if pCfg.InactiveWriteScale.Enabled || pCfg.InactiveReadScale.Enabled { // Autoscale last N tables - table.WriteScale = pCfg.InactiveWriteScale + // this is measured against "now", since the lastWeek is the final week in the schema config range + // the N last tables in that range will always be set to the inactive scaling settings. + if pCfg.InactiveWriteScale.Enabled && i >= (nowWeek-pCfg.InactiveWriteScaleLastN) { + table.WriteScale = pCfg.InactiveWriteScale + table.UseOnDemandIOMode = false + } + if pCfg.InactiveReadScale.Enabled && i >= (nowWeek-pCfg.InactiveReadScaleLastN) { + table.ReadScale = pCfg.InactiveReadScale + table.UseOnDemandIOMode = false + } + + level.Debug(util.Logger).Log("msg", "Table is Inactive", + "tableName", table.Name, + "provisionedRead", table.ProvisionedRead, + "provisionedWrite", table.ProvisionedWrite, + "useOnDemandMode", table.UseOnDemandIOMode, + "useWriteAutoScale", table.WriteScale.Enabled, + "useReadAutoScale", table.ReadScale.Enabled) } result = append(result, table) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 9ae1b2dc..a6c837c3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -337,9 +337,6 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun return err } - // Horribly, PutChunks mutates the chunk by setting its checksum. By putting - // the chunk in a slice we are in fact passing by reference, so below we - // need to make sure we pick the chunk back out the slice. chunks := []Chunk{chunk} err = c.storage.PutChunks(ctx, chunks) @@ -349,7 +346,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun c.writeBackCache(ctx, chunks) - writeReqs, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunks[0]) + writeReqs, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunk) if err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_client.go index c7a44799..43fcc0d1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_client.go @@ -13,11 +13,13 @@ type TableClient interface { // TableDesc describes a table. type TableDesc struct { - Name string - ProvisionedRead int64 - ProvisionedWrite int64 - Tags Tags - WriteScale AutoScalingConfig + Name string + UseOnDemandIOMode bool + ProvisionedRead int64 + ProvisionedWrite int64 + Tags Tags + WriteScale AutoScalingConfig + ReadScale AutoScalingConfig } // Equals returns true if other matches desc. @@ -26,7 +28,12 @@ func (desc TableDesc) Equals(other TableDesc) bool { return false } - if desc.ProvisionedRead != other.ProvisionedRead { + if desc.ReadScale != other.ReadScale { + return false + } + + // Only check provisioned read if auto scaling is disabled + if !desc.ReadScale.Enabled && desc.ProvisionedRead != other.ProvisionedRead { return false } @@ -35,6 +42,11 @@ func (desc TableDesc) Equals(other TableDesc) bool { return false } + // if the billing mode needs updating + if desc.UseOnDemandIOMode != other.UseOnDemandIOMode { + return false + } + if !desc.Tags.Equals(other.Tags) { return false } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go index 07b3c0ed..988ba781 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go @@ -65,14 +65,19 @@ type TableManagerConfig struct { // ProvisionConfig holds config for provisioning capacity (on DynamoDB) type ProvisionConfig struct { - ProvisionedWriteThroughput int64 - ProvisionedReadThroughput int64 - InactiveWriteThroughput int64 - InactiveReadThroughput int64 + ProvisionedThroughputOnDemandMode bool + ProvisionedWriteThroughput int64 + ProvisionedReadThroughput int64 + InactiveThroughputOnDemandMode bool + InactiveWriteThroughput int64 + InactiveReadThroughput int64 WriteScale AutoScalingConfig InactiveWriteScale AutoScalingConfig InactiveWriteScaleLastN int64 + ReadScale AutoScalingConfig + InactiveReadScale AutoScalingConfig + InactiveReadScaleLastN int64 } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -91,12 +96,18 @@ func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet) { func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) { f.Int64Var(&cfg.ProvisionedWriteThroughput, argPrefix+".write-throughput", 3000, "DynamoDB table default write throughput.") f.Int64Var(&cfg.ProvisionedReadThroughput, argPrefix+".read-throughput", 300, "DynamoDB table default read throughput.") + f.BoolVar(&cfg.ProvisionedThroughputOnDemandMode, argPrefix+".enable-ondemand-throughput-mode", false, "Enables on demand througput provisioning for the storage provider (if supported). Applies only to tables which are not autoscaled") f.Int64Var(&cfg.InactiveWriteThroughput, argPrefix+".inactive-write-throughput", 1, "DynamoDB table write throughput for inactive tables.") f.Int64Var(&cfg.InactiveReadThroughput, argPrefix+".inactive-read-throughput", 300, "DynamoDB table read throughput for inactive tables.") + f.BoolVar(&cfg.InactiveThroughputOnDemandMode, argPrefix+".inactive-enable-ondemand-throughput-mode", false, "Enables on demand througput provisioning for the storage provider (if supported). Applies only to tables which are not autoscaled") cfg.WriteScale.RegisterFlags(argPrefix+".write-throughput.scale", f) cfg.InactiveWriteScale.RegisterFlags(argPrefix+".inactive-write-throughput.scale", f) f.Int64Var(&cfg.InactiveWriteScaleLastN, argPrefix+".inactive-write-throughput.scale-last-n", 4, "Number of last inactive tables to enable write autoscale.") + + cfg.ReadScale.RegisterFlags(argPrefix+".read-throughput.scale", f) + cfg.InactiveReadScale.RegisterFlags(argPrefix+".inactive-read-throughput.scale", f) + f.Int64Var(&cfg.InactiveReadScaleLastN, argPrefix+".inactive-read-throughput.scale-last-n", 4, "Number of last inactive tables to enable read autoscale.") } // Tags is a string-string map that implements flag.Value. @@ -235,10 +246,11 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { } table := TableDesc{ - Name: config.IndexTables.Prefix, - ProvisionedRead: m.cfg.IndexTables.InactiveReadThroughput, - ProvisionedWrite: m.cfg.IndexTables.InactiveWriteThroughput, - Tags: config.IndexTables.Tags, + Name: config.IndexTables.Prefix, + ProvisionedRead: m.cfg.IndexTables.InactiveReadThroughput, + ProvisionedWrite: m.cfg.IndexTables.InactiveWriteThroughput, + UseOnDemandIOMode: m.cfg.IndexTables.InactiveThroughputOnDemandMode, + Tags: config.IndexTables.Tags, } isActive := true if i+1 < len(m.schemaCfg.Configs) { @@ -250,14 +262,20 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { ) if now >= endTime+gracePeriodSecs+maxChunkAgeSecs { isActive = false + } } if isActive { table.ProvisionedRead = m.cfg.IndexTables.ProvisionedReadThroughput table.ProvisionedWrite = m.cfg.IndexTables.ProvisionedWriteThroughput - + table.UseOnDemandIOMode = m.cfg.IndexTables.ProvisionedThroughputOnDemandMode if m.cfg.IndexTables.WriteScale.Enabled { table.WriteScale = m.cfg.IndexTables.WriteScale + table.UseOnDemandIOMode = false + } + if m.cfg.IndexTables.ReadScale.Enabled { + table.ReadScale = m.cfg.IndexTables.ReadScale + table.UseOnDemandIOMode = false } } result = append(result, table) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go index b3095884..123c4652 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go @@ -58,10 +58,6 @@ func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { "index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)), }) chunks = append(chunks, chunk) - _, err := chunk.Encode() // Need to encode it, side effect calculates crc - if err != nil { - return nil, nil, err - } keys = append(keys, chunk.ExternalKey()) } return keys, chunks, nil @@ -86,7 +82,7 @@ func dummyChunkFor(now model.Time, metric model.Metric) chunk.Chunk { now, ) // Force checksum calculation. - _, err := chunk.Encode() + err := chunk.Encode() if err != nil { panic(err) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go index 51cc9248..84ab8d2f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go @@ -100,10 +100,11 @@ func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) { defer shard.mtx.Unlock() for _, pair := range metric { - name, value := model.LabelName(pair.Name), model.LabelValue(pair.Value) - values, ok := shard.idx[name] + value := model.LabelValue(pair.Value) + values, ok := shard.idx[model.LabelName(pair.Name)] if !ok { values = map[model.LabelValue][]model.Fingerprint{} + shard.idx[model.LabelName(pair.Name)] = values } fingerprints := values[value] // Insert into the right position to keep fingerprints sorted @@ -114,7 +115,6 @@ func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) { copy(fingerprints[j+1:], fingerprints[j:]) fingerprints[j] = fp values[value] = fingerprints - shard.idx[name] = values } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go index c1dea6ec..a2186c96 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go @@ -19,6 +19,7 @@ type Limits struct { RejectOldSamples bool `yaml:"reject_old_samples"` RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` CreationGracePeriod time.Duration `yaml:"creation_grace_period"` + EnforceMetricName bool `yaml:"enforce_metric_name"` // Ingester enforced limits. MaxSeriesPerQuery int `yaml:"max_series_per_query"` @@ -45,6 +46,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Reject old samples.") f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.") f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") + f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") f.IntVar(&l.MaxSeriesPerQuery, "ingester.max-series-per-query", 100000, "The maximum number of series that a query can return.") f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return.") diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go index 5927e453..ce810e69 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go @@ -241,3 +241,10 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration { return l.MaxQueryLength }) } + +// EnforceMetricName whether to enforce the presence of a metric name. +func (o *Overrides) EnforceMetricName(userID string) bool { + return o.getBool(userID, func(l *Limits) bool { + return l.EnforceMetricName + }) +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go index 88a45998..2718638c 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go @@ -65,12 +65,14 @@ func (cfg *Overrides) ValidateSample(userID string, metricName []byte, s client. // ValidateLabels returns an err if the labels are invalid. func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error { metricName, err := extract.MetricNameFromLabelPairs(ls) - if err != nil { - return httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName) - } + if cfg.EnforceMetricName(userID) { + if err != nil { + return httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName) + } - if !model.IsValidMetricName(model.LabelValue(metricName)) { - return httpgrpc.Errorf(http.StatusBadRequest, errInvalidMetricName, metricName) + if !model.IsValidMetricName(model.LabelValue(metricName)) { + return httpgrpc.Errorf(http.StatusBadRequest, errInvalidMetricName, metricName) + } } numLabelNames := len(ls) -- GitLab