diff --git a/CHANGELOG.md b/CHANGELOG.md index 52bdf4977c6..53ac8a483eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`. * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 +* [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125 * [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index a6e0a29e67f..f69c46e791b 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1793,6 +1793,14 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size [request_token_bucket_size: | default = 4194304] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 868f534b2a0..530f66e07ce 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1871,6 +1871,14 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size [request_token_bucket_size: | default = 4194304] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 75257b54541..d60f7305b6d 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2477,6 +2477,14 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size [request_token_bucket_size: | default = 4194304] + # [Experimental] Maximum size of the Parquet shard cache. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-size + [parquet_shard_cache_size: | default = 512] + + # [Experimental] TTL of the Parquet shard cache. 0 to no TTL. + # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl + [parquet_shard_cache_ttl: | default = 24h] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/integration/querier_test.go b/integration/querier_test.go index b85dd4fed8f..66894b11f4d 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -519,6 +519,17 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_redis_requests_total")) } + + // ensure parquet shard cache works + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_hits_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"), + labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards")))) + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_item_count"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"), + labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards")))) + require.NoError(t, storeGateways.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_parquet_cache_misses_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"), + labels.MustNewMatcher(labels.MatchEqual, "name", "parquet-shards")))) } // Query metadata. diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 95ce6563971..c5e465c591a 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -18,6 +18,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/users" ) @@ -332,6 +333,8 @@ type BucketStoreConfig struct { // Token bucket configs TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"` + // Parquet shard cache config + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` } type TokenBucketBytesLimiterConfig struct { @@ -393,6 +396,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token") f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token") f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.") + cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f) } // Validate the config. diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 17b0ec3094c..e6d785d17ad 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -37,6 +37,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" + "github.com/weaveworks/common/user" "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -761,6 +762,7 @@ func querySeries(stores BucketStores, userID, metricName string, minT, maxT int6 } ctx := setUserIDToGRPCContext(context.Background(), userID) + ctx = user.InjectOrgID(ctx, userID) srv := newBucketStoreSeriesServer(ctx) err = stores.Series(req, srv) diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index cc7d83f9dcd..c8fdd59a323 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -37,10 +38,12 @@ type parquetBucketStore struct { chunksDecoder *schema.PrometheusParquetChunksDecoder - matcherCache storecache.MatchersCache + matcherCache storecache.MatchersCache + parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] } func (p *parquetBucketStore) Close() error { + p.parquetShardCache.Close() return p.bucket.Close() } diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index 636e9e9c0c9..6fa94a81092 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -32,7 +32,9 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" cortex_util "github.com/cortexproject/cortex/pkg/util" cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" + "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -52,7 +54,8 @@ type ParquetBucketStores struct { chunksDecoder *schema.PrometheusParquetChunksDecoder - matcherCache storecache.MatchersCache + matcherCache storecache.MatchersCache + parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] inflightRequests *cortex_util.InflightRequestTracker } @@ -65,15 +68,21 @@ func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, bucketClient objstore. return nil, err } + parquetShardCache, err := parquetutil.NewParquetShardCache[parquet_storage.ParquetShard](&cfg.BucketStore.ParquetShardCache, "parquet-shards", reg) + if err != nil { + return nil, err + } + u := &ParquetBucketStores{ - logger: logger, - cfg: cfg, - limits: limits, - bucket: cachingBucket, - stores: map[string]*parquetBucketStore{}, - storesErrors: map[string]error{}, - chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()), - inflightRequests: cortex_util.NewInflightRequestTracker(), + logger: logger, + cfg: cfg, + limits: limits, + bucket: cachingBucket, + stores: map[string]*parquetBucketStore{}, + storesErrors: map[string]error{}, + chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()), + inflightRequests: cortex_util.NewInflightRequestTracker(), + parquetShardCache: parquetShardCache, } if cfg.BucketStore.MatchersCacheMaxItems > 0 { @@ -246,12 +255,13 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) store := &parquetBucketStore{ - logger: userLogger, - bucket: userBucket, - limits: u.limits, - concurrency: 4, // TODO: make this configurable - chunksDecoder: u.chunksDecoder, - matcherCache: u.matcherCache, + logger: userLogger, + bucket: userBucket, + limits: u.limits, + concurrency: 4, // TODO: make this configurable + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + parquetShardCache: u.parquetShardCache, } return store, nil @@ -265,21 +275,35 @@ type parquetBlock struct { } func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { - shard, err := parquet_storage.NewParquetShardOpener( - context.WithoutCancel(ctx), - name, - labelsFileOpener, - chunksFileOpener, - 0, - parquet_storage.WithFileOptions( - parquet.SkipMagicBytes(true), - parquet.ReadBufferSize(100*1024), - parquet.SkipBloomFilters(true), - parquet.OptimisticRead(true), - ), - ) + userID, err := users.TenantID(ctx) if err != nil { - return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name) + return nil, err + } + + cacheKey := fmt.Sprintf("%v-%v", userID, name) + shard := p.parquetShardCache.Get(cacheKey) + + if shard == nil { + // cache miss, open parquet files + shard, err = parquet_storage.NewParquetShardOpener( + context.WithoutCancel(ctx), + name, + labelsFileOpener, + chunksFileOpener, + 0, // we always only have 1 shard - shard 0 + parquet_storage.WithFileOptions( + parquet.SkipMagicBytes(true), + parquet.ReadBufferSize(100*1024), + parquet.SkipBloomFilters(true), + parquet.OptimisticRead(true), + ), + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name) + } + + // set shard to cache + p.parquetShardCache.Set(cacheKey, shard) } s, err := shard.TSDBSchema() diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 30efc2eb96c..ad789df3614 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2600,6 +2600,19 @@ }, "type": "object" }, + "parquet_shard_cache_size": { + "default": 512, + "description": "[Experimental] Maximum size of the Parquet shard cache. 0 to disable.", + "type": "number", + "x-cli-flag": "blocks-storage.bucket-store.parquet-shard-cache-size" + }, + "parquet_shard_cache_ttl": { + "default": "24h0m0s", + "description": "[Experimental] TTL of the Parquet shard cache. 0 to no TTL.", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.parquet-shard-cache-ttl", + "x-format": "duration" + }, "series_batch_size": { "default": 10000, "description": "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.",