diff --git a/CHANGELOG.md b/CHANGELOG.md index 52bdf4977c6..46e9855fdc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`. * [FEATURE] StoreGateway: Introduces a new parquet mode. #7046 * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 +* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125 * [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092 * [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index a6e0a29e67f..915bf258bd5 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -310,6 +310,13 @@ querier: # queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + + # [Experimental] If true, querier will honor projection hints and only + # materialize requested labels. Today, projection is only effective when + # Parquet Queryable is enabled. Projection is only applied when not querying + # mixed block types (parquet and non-parquet) and not querying ingesters. + # CLI flag: -querier.honor-projection-hints + [honor_projection_hints: | default = false] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 75257b54541..c10073e49b8 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4827,6 +4827,13 @@ thanos_engine: # need to make sure Parquet files are created before it is queryable. # CLI flag: -querier.parquet-queryable-fallback-disabled [parquet_queryable_fallback_disabled: | default = false] + +# [Experimental] If true, querier will honor projection hints and only +# materialize requested labels. Today, projection is only effective when Parquet +# Queryable is enabled. Projection is only applied when not querying mixed block +# types (parquet and non-parquet) and not querying ingesters. +# CLI flag: -querier.honor-projection-hints +[honor_projection_hints: | default = false] ``` ### `query_frontend_config` diff --git a/go.mod b/go.mod index a5f5e66dee1..9a26035ca69 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 + github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index e8d973c9536..8dab5b15dfe 100644 --- a/go.sum +++ b/go.sum @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk= -github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8= +github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc= diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index f4bb5a6c60d..d7462a5389f 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -7,11 +7,13 @@ import ( "fmt" "math/rand" "path/filepath" + "slices" "strconv" "testing" "time" "github.com/cortexproject/promqlsmith" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -23,7 +25,9 @@ import ( e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/log" cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" ) @@ -176,3 +180,223 @@ func TestParquetFuzz(t *testing.T) { require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) } + +func TestParquetProjectionPushdownFuzz(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.query-store-for-labels-enabled": "true", + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + // Querier - Enable Thanos engine with projection optimizer + "-querier.thanos-engine": "true", + "-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection + "-querier.enable-parquet-queryable": "true", + "-querier.honor-projection-hints": "true", // Honor projection hints + // Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters + // Since test queries are 24-48h old, they won't query ingesters and projection will be enabled + "-querier.query-ingesters-within": "2h", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + numSeries := 20 + numSamples := 100 + lbls := make([]labels.Labels, 0, numSeries) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + methods := []string{"GET", "POST", "PUT", "DELETE"} + now := time.Now() + // Make sure query time is old enough to not overlap with ingesters. + start := now.Add(-time.Hour * 72) + end := now.Add(-time.Hour * 48) + + // Create series with multiple labels + for i := range numSeries { + lbls = append(lbls, labels.FromStrings( + labels.MetricName, "http_requests_total", + "job", "api-server", + "instance", fmt.Sprintf("instance-%d", i%5), + "status_code", statusCodes[i%len(statusCodes)], + "method", methods[i%len(methods)], + "path", fmt.Sprintf("/api/v1/endpoint%d", i%3), + "cluster", "test-cluster", + )) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := storage.GetBucket() + userBucket := bucket.NewUserBucketClient("user-1", bkt, nil) + + err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait until we convert the blocks to parquet AND bucket index is updated + cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} { + // Check if parquet marker exists + markerFound := false + err := userBucket.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + markerFound = true + } + return nil + }, objstore.WithRecursiveIter()) + if err != nil || !markerFound { + return false + } + + // Check if bucket index exists AND contains the parquet block metadata + idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger) + if err != nil { + return false + } + + // Verify the block is in the bucket index with parquet metadata + for _, b := range idx.Blocks { + if b.ID == id && b.Parquet != nil { + require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion) + return true + } + } + return false + }) + + c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Wait for data to be queryable before running the projection hints tests + cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} { + labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end) + if err != nil { + t.Logf("Series query failed: %v", err) + return false + } + return len(labelSets) > 0 + }) + + testCases := []struct { + name string + query string + expectedLabels []string // Labels that should be present in result + }{ + { + name: "vector selector query should not use projection", + query: `http_requests_total`, + expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"}, + }, + { + name: "simple_sum_by_job", + query: `sum by (job) (http_requests_total)`, + expectedLabels: []string{"job"}, + }, + { + name: "rate_with_aggregation", + query: `sum by (method) (rate(http_requests_total[5m]))`, + expectedLabels: []string{"method"}, + }, + { + name: "multiple_grouping_labels", + query: `sum by (job, status_code) (http_requests_total)`, + expectedLabels: []string{"job", "status_code"}, + }, + { + name: "aggregation without query", + query: `sum without (instance, method) (http_requests_total)`, + expectedLabels: []string{"job", "status_code", "path", "cluster"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Testing: %s", tc.query) + + // Execute instant query + result, err := c.Query(tc.query, end) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify we got results + vector, ok := result.(model.Vector) + require.True(t, ok, "result should be a vector") + require.NotEmpty(t, vector, "query should return results") + + t.Logf("Query returned %d series", len(vector)) + + // Verify projection worked: series should only have the expected labels + for _, sample := range vector { + actualLabels := make(map[string]struct{}) + for label := range sample.Metric { + actualLabels[string(label)] = struct{}{} + } + + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + _, ok := actualLabels[expectedLabel] + require.True(t, ok, + "series should have %s label", expectedLabel) + } + + // Check that no unexpected labels are present + for lbl := range actualLabels { + if !slices.Contains(tc.expectedLabels, lbl) { + require.Fail(t, "series should not have unexpected label: %s", lbl) + } + } + } + }) + } + + // Verify that parquet blocks were queried + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) +} diff --git a/integration/querier_test.go b/integration/querier_test.go index b85dd4fed8f..7aad8db0923 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -461,7 +461,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { require.Equal(t, model.ValVector, result.Type()) assert.Equal(t, expectedVector3, result.(model.Vector)) - if testCfg.bucketStorageType == "tedb" { + if testCfg.bucketStorageType == "tsdb" { // Check the in-memory index cache metrics (in the store-gateway). require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total")) require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index b821dced7ee..0a594730af9 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -28,6 +28,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" @@ -147,6 +148,7 @@ func NewParquetQueryable( } parquetQueryableOpts := []queryable.QueryableOpts{ + queryable.WithHonorProjectionHints(config.HonorProjectionHints), queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 { // Ignore error as this shouldn't happen. // If failed to resolve tenant we will just use the default limit value. @@ -499,6 +501,14 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool sortSeries = true } + // Reset projection hints if: + // - there are mixed blocks (both parquet and non-parquet) + // - not all parquet blocks have hash column (version < 2) + if len(remaining) > 0 || !allParquetBlocksHaveHashColumn(parquet) { + hints.ProjectionLabels = nil + hints.ProjectionInclude = false + } + promises := make([]chan storage.SeriesSet, 0, 2) if len(parquet) > 0 { @@ -598,6 +608,17 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } +// allParquetBlocksHaveHashColumn checks if all parquet blocks have version >= 2, which means they have the hash column. +// Parquet blocks with version 1 don't have the hash column, so projection cannot be enabled for them. +func allParquetBlocksHaveHashColumn(blocks []*bucketindex.Block) bool { + for _, b := range blocks { + if b.Parquet == nil || b.Parquet.Version < cortex_parquet.ParquetConverterMarkVersion2 { + return false + } + } + return true +} + type shardMatcherLabelsFilter struct { shardMatcher *storepb.ShardMatcher } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 70c2a5cdacd..e3cbb71c7d6 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -172,7 +172,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, &bucketindex.Block{ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -241,8 +241,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -315,8 +315,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select", func(t *testing.T) { @@ -632,6 +632,230 @@ func (mockParquetQuerier) Close() error { return nil } +func TestSelectProjectionHints(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + now := time.Now() + + tests := map[string]struct { + minT int64 + maxT int64 + hasRemainingBlocks bool // Whether there are non-parquet (TSDB) blocks + parquetBlockVersion int // Version of parquet blocks (1 or 2) + mixedVersions bool // If true, block1 is v1, block2 is v2 + inputProjectionLabels []string + inputProjectionInclude bool // Input ProjectionInclude value + expectedProjectionLabels []string // nil means projection disabled + expectedProjectionInclude bool + }{ + "projection enabled: all parquet blocks v2, no remaining blocks": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, // Version 2 has hash column + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, // Preserved + expectedProjectionInclude: true, + }, + "projection disabled: mixed blocks (parquet + TSDB)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + hasRemainingBlocks: true, // Mixed blocks + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset + expectedProjectionInclude: false, + }, + "projection disabled: ProjectionInclude is false": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion2, + inputProjectionLabels: []string{"job"}, + inputProjectionInclude: false, + expectedProjectionLabels: []string{"job"}, // Labels remain unchanged when ProjectionInclude is false + expectedProjectionInclude: false, + }, + "projection disabled: parquet blocks version 1 (no hash column)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + hasRemainingBlocks: false, + parquetBlockVersion: parquet.ParquetConverterMarkVersion1, // Version 1 doesn't have hash column + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because version 1 doesn't support projection + expectedProjectionInclude: false, + }, + "projection disabled: mixed parquet block versions (v1 and v2)": { + minT: util.TimeToMillis(now.Add(-10 * time.Hour)), + maxT: util.TimeToMillis(now.Add(-5 * time.Hour)), + hasRemainingBlocks: false, + mixedVersions: true, // block1 is v1, block2 is v2 + inputProjectionLabels: []string{"__name__", "job"}, + inputProjectionInclude: true, + expectedProjectionLabels: nil, // Reset because not all blocks support projection + expectedProjectionInclude: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "user-1") + finder := &blocksFinderMock{} + + // Setup blocks + var blocks bucketindex.Blocks + if testData.hasRemainingBlocks { + // Mixed: one parquet, one TSDB + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + &bucketindex.Block{ID: block2}, // No parquet metadata = TSDB block + } + } else if testData.mixedVersions { + // Mixed parquet versions: block1 is v1, block2 is v2 + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + } + } else { + // All parquet with same version + blocks = bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: testData.parquetBlockVersion}}, + } + } + finder.On("GetBlocks", mock.Anything, "user-1", testData.minT, mock.Anything, mock.Anything).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark{}, nil) + + // Mock TSDB querier (for remaining blocks) + mockTSDBQuerier := &mockParquetQuerier{} + + // Mock parquet querier (captures hints) + mockParquetQuerierInstance := &mockParquetQuerier{} + + // Create the parquetQuerierWithFallback + pq := &parquetQuerierWithFallback{ + minT: testData.minT, + maxT: testData.maxT, + finder: finder, + blocksStoreQuerier: mockTSDBQuerier, + parquetQuerier: mockParquetQuerierInstance, + queryStoreAfter: 0, // Disable queryStoreAfter manipulation + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), + defaultBlockStoreType: parquetBlockStore, + fallbackDisabled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric"), + } + + // Create input hints with projection + inputHints := &storage.SelectHints{ + Start: testData.minT, + End: testData.maxT, + ProjectionLabels: testData.inputProjectionLabels, + ProjectionInclude: testData.inputProjectionInclude, + } + + // Execute Select + set := pq.Select(ctx, false, inputHints, matchers...) + require.NotNil(t, set) + + // Verify the hints passed to the parquet querier + if !testData.hasRemainingBlocks { + // If all parquet blocks, verify hints passed to parquet querier + require.NotNil(t, mockParquetQuerierInstance.queriedHints, "parquet querier should have been called") + require.Equal(t, testData.expectedProjectionLabels, mockParquetQuerierInstance.queriedHints.ProjectionLabels, + "projection labels mismatch") + require.Equal(t, testData.expectedProjectionInclude, mockParquetQuerierInstance.queriedHints.ProjectionInclude, + "projection include flag mismatch") + } + }) + } +} + +func TestAllParquetBlocksHaveHashColumn(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + + tests := map[string]struct { + blocks []*bucketindex.Block + expected bool + }{ + "all blocks v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "all blocks v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "mixed versions v1 and v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: false, + }, + "one block nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "all blocks nil parquet metadata": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: nil}, + {ID: block2, Parquet: nil}, + }, + expected: false, + }, + "single block v2": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + "single block v1": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, + }, + expected: false, + }, + "empty blocks": { + blocks: []*bucketindex.Block{}, + expected: true, // No blocks with version < 2, so return true + }, + "all blocks v3 or higher": { + blocks: []*bucketindex.Block{ + {ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 3}}, + {ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 4}}, + {ID: block3, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion2}}, + }, + expected: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + result := allParquetBlocksHaveHashColumn(testData.blocks) + require.Equal(t, testData.expected, result, "unexpected result for %s", testName) + }) + } +} + func TestMaterializedLabelsFilterCallback(t *testing.T) { tests := []struct { name string @@ -799,7 +1023,7 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where block1 has parquet metadata but block2 doesn't finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet &bucketindex.Block{ID: block2}, // Not available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) @@ -858,8 +1082,8 @@ func TestParquetQueryableFallbackDisabled(t *testing.T) { // Set up blocks where both blocks have parquet metadata finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything, mock.Anything).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: parquet.ParquetConverterMarkVersion1}}, // Available as parquet }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) t.Run("select should work without error", func(t *testing.T) { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 636300d64df..922501d9310 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -99,6 +99,8 @@ type Config struct { ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"` DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` + + HonorProjectionHints bool `yaml:"honor_projection_hints"` } var ( @@ -149,6 +151,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.") cfg.ParquetShardCache.RegisterFlagsWithPrefix("querier.", f) f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.") + f.BoolVar(&cfg.HonorProjectionHints, "querier.honor-projection-hints", false, "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") } @@ -303,6 +306,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, limits: limits, maxQueryIntoFuture: cfg.MaxQueryIntoFuture, ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, + honorProjectionHints: cfg.HonorProjectionHints, distributor: distributor, stores: stores, limiterHolder: &limiterHolder{}, @@ -313,13 +317,14 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, } type querier struct { - now time.Time - mint, maxt int64 - limits *validation.Overrides - maxQueryIntoFuture time.Duration - distributor QueryableWithFilter - stores []QueryableWithFilter - limiterHolder *limiterHolder + now time.Time + mint, maxt int64 + limits *validation.Overrides + maxQueryIntoFuture time.Duration + honorProjectionHints bool + distributor QueryableWithFilter + stores []QueryableWithFilter + limiterHolder *limiterHolder ignoreMaxQueryLength bool } @@ -435,6 +440,15 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select } } + // Reset projection hints if querying ingesters or projection is not included. + // Projection can only be applied when not querying mixed sources (ingester + store). + if q.honorProjectionHints { + if !sp.ProjectionInclude || q.distributor.UseQueryable(q.now, mint, maxt) { + sp.ProjectionLabels = nil + sp.ProjectionInclude = false + } + } + if len(queriers) == 1 { return queriers[0].Select(ctx, sortSeries, sp, matchers...) } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index cf7855eafa3..bf3ff326213 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1723,3 +1723,130 @@ func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool { m.useQueryableCalled = true return true } + +func TestQuerier_ProjectionHints(t *testing.T) { + t.Parallel() + start := time.Now().Add(-2 * time.Hour) + end := time.Now() + ctx := user.InjectOrgID(context.Background(), "0") + + tests := map[string]struct { + honorProjectionHints bool + inputProjectionInclude bool + inputProjectionLabels []string + queryIngesters bool // Whether ingesters should be queried + expectedProjectionInclude bool + expectedProjectionLabels []string + }{ + "projection preserved: honor enabled, projection included, not querying ingesters": { + honorProjectionHints: true, + inputProjectionInclude: true, + inputProjectionLabels: []string{"__name__", "job"}, + queryIngesters: false, + expectedProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, + }, + "projection reset: honor enabled, projection included, querying ingesters": { + honorProjectionHints: true, + inputProjectionInclude: true, + inputProjectionLabels: []string{"__name__", "job"}, + queryIngesters: true, + expectedProjectionInclude: false, + expectedProjectionLabels: nil, + }, + "projection reset: honor enabled, projection not included": { + honorProjectionHints: true, + inputProjectionInclude: false, + inputProjectionLabels: []string{"__name__", "job"}, + queryIngesters: false, + expectedProjectionInclude: false, + expectedProjectionLabels: nil, + }, + "projection not modified: honor disabled, projection included, querying ingesters": { + honorProjectionHints: false, + inputProjectionInclude: true, + inputProjectionLabels: []string{"__name__", "job"}, + queryIngesters: true, + expectedProjectionInclude: true, + expectedProjectionLabels: []string{"__name__", "job"}, + }, + "projection not modified: honor disabled, projection not included": { + honorProjectionHints: false, + inputProjectionInclude: false, + inputProjectionLabels: []string{"__name__", "job"}, + queryIngesters: false, + expectedProjectionInclude: false, + expectedProjectionLabels: []string{"__name__", "job"}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + t.Parallel() + var cfg Config + flagext.DefaultValues(&cfg) + cfg.ActiveQueryTrackerDir = "" + cfg.HonorProjectionHints = testData.honorProjectionHints + + overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) + + // Create mock store + chunkStore := &emptyChunkStore{} + storeQueryable := &wrappedSampleAndChunkQueryable{ + QueryableWithFilter: UseAlwaysQueryable(NewMockStoreQueryable(chunkStore)), + } + + // Create mock distributor + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + + // Create distributor queryable that can be controlled to be used or not + var distributorQueryable QueryableWithFilter + if testData.queryIngesters { + // Ingesters will be queried + distributorQueryable = newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1) + } else { + // Ingesters will not be queried (time range is too old) + distributorQueryable = UseBeforeTimestampQueryable( + newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, nil, 1), + start.Add(-1*time.Hour), + ) + } + + wDistributorQueryable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: distributorQueryable} + + queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides) + q, err := queryable.Querier(util.TimeToMillis(start), util.TimeToMillis(end)) + require.NoError(t, err) + + // Create select hints with projection + hints := &storage.SelectHints{ + Start: util.TimeToMillis(start), + End: util.TimeToMillis(end), + ProjectionInclude: testData.inputProjectionInclude, + ProjectionLabels: testData.inputProjectionLabels, + } + + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test") + set := q.Select(ctx, false, hints, matcher) + require.False(t, set.Next()) // Expected to be empty + require.NoError(t, set.Err()) + + // Check the projection hints received by the underlying querier(s) + var receivedHints *storage.SelectHints + for _, queryable := range append([]QueryableWithFilter{storeQueryable}, wDistributorQueryable) { + wQueryable := queryable.(*wrappedSampleAndChunkQueryable) + if wQueryable.UseQueryable(time.Now(), util.TimeToMillis(start), util.TimeToMillis(end)) { + require.Len(t, wQueryable.queriers, 1) + require.Len(t, wQueryable.queriers[0].selectCallsArgs, 1) + receivedHints = wQueryable.queriers[0].selectCallsArgs[0][1].(*storage.SelectHints) + break + } + } + + require.NotNil(t, receivedHints, "should have received hints") + assert.Equal(t, testData.expectedProjectionInclude, receivedHints.ProjectionInclude, "ProjectionInclude mismatch") + assert.Equal(t, testData.expectedProjectionLabels, receivedHints.ProjectionLabels, "ProjectionLabels mismatch") + }) + } +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 30efc2eb96c..fbdca1b6fae 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5772,6 +5772,12 @@ "type": "boolean", "x-cli-flag": "querier.enable-promql-experimental-functions" }, + "honor_projection_hints": { + "default": false, + "description": "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.", + "type": "boolean", + "x-cli-flag": "querier.honor-projection-hints" + }, "ignore_max_query_length": { "default": false, "description": "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.", diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 24daad7ef26..01c854275cf 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -467,7 +467,8 @@ func (m *Materializer) MaterializeLabels(ctx context.Context, hints *prom_storag break } } - needsHash = !seriesHashExcluded + // Hash column is only needed if projection labels are provided and series hash is not excluded. + needsHash = !seriesHashExcluded && len(hints.ProjectionLabels) > 0 for _, labelName := range hints.ProjectionLabels { if labelName == schema.SeriesHashColumn { diff --git a/vendor/modules.txt b/vendor/modules.txt index d4aecb1b3e7..91fa9c2d44c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -952,7 +952,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 +# github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 ## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable