Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [ENHANCEMENT] StoreGateway: Add tracings to parquet mode. #7125
* [ENHANCEMENT] Alertmanager: Upgrade alertmanger to 0.29.0 and add a new incidentIO integration. #7092
* [ENHANCEMENT] Querier: Add a `-querier.parquet-queryable-shard-cache-ttl` flag to add TTL to parquet shard cache. #7098
Expand Down
7 changes: 7 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ querier:
# queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] If true, querier will honor projection hints and only
# materialize requested labels. Today, projection is only effective when
# Parquet Queryable is enabled. Projection is only applied when not querying
# mixed block types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]
```

### `blocks_storage_config`
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4827,6 +4827,13 @@ thanos_engine:
# need to make sure Parquet files are created before it is queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]

# [Experimental] If true, querier will honor projection hints and only
# materialize requested labels. Today, projection is only effective when Parquet
# Queryable is enabled. Projection is only applied when not querying mixed block
# types (parquet and non-parquet) and not querying ingesters.
# CLI flag: -querier.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]
```

### `query_frontend_config`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94
github.com/prometheus/client_golang/exp v0.0.0-20250914183048-a974e0d45e0a
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1634,8 +1634,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71 h1:BwrzRNGy0GbnBA7rQd85G6NuFvydvwTXxRB9XiA5TXk=
github.com/prometheus-community/parquet-common v0.0.0-20251205214622-b9865c513b71/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.29.0 h1:/ET4NmAGx2Dv9kStrXIBqBgHyiSgIk4OetY+hoZRfgc=
Expand Down
224 changes: 224 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"fmt"
"math/rand"
"path/filepath"
"slices"
"strconv"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -23,7 +25,9 @@ import (
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)
Expand Down Expand Up @@ -176,3 +180,223 @@ func TestParquetFuzz(t *testing.T) {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetProjectionPushdownFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
// Querier - Enable Thanos engine with projection optimizer
"-querier.thanos-engine": "true",
"-querier.optimizers": "propagate-matchers,sort-matchers,merge-selects,detect-histogram-stats,projection", // Enable all optimizers including projection
"-querier.enable-parquet-queryable": "true",
"-querier.honor-projection-hints": "true", // Honor projection hints
// Set query-ingesters-within to 2h so queries older than 2h don't hit ingesters
// Since test queries are 24-48h old, they won't query ingesters and projection will be enabled
"-querier.query-ingesters-within": "2h",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
numSeries := 20
numSamples := 100
lbls := make([]labels.Labels, 0, numSeries)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
methods := []string{"GET", "POST", "PUT", "DELETE"}
now := time.Now()
// Make sure query time is old enough to not overlap with ingesters.
start := now.Add(-time.Hour * 72)
end := now.Add(-time.Hour * 48)

// Create series with multiple labels
for i := range numSeries {
lbls = append(lbls, labels.FromStrings(
labels.MetricName, "http_requests_total",
"job", "api-server",
"instance", fmt.Sprintf("instance-%d", i%5),
"status_code", statusCodes[i%len(statusCodes)],
"method", methods[i%len(methods)],
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
"cluster", "test-cluster",
))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := storage.GetBucket()
userBucket := bucket.NewUserBucketClient("user-1", bkt, nil)

err = block.Upload(ctx, log.Logger, userBucket, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait until we convert the blocks to parquet AND bucket index is updated
cortex_testutil.Poll(t, 300*time.Second, true, func() interface{} {
// Check if parquet marker exists
markerFound := false
err := userBucket.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
markerFound = true
}
return nil
}, objstore.WithRecursiveIter())
if err != nil || !markerFound {
return false
}

// Check if bucket index exists AND contains the parquet block metadata
idx, err := bucketindex.ReadIndex(ctx, bkt, "user-1", nil, log.Logger)
if err != nil {
return false
}

// Verify the block is in the bucket index with parquet metadata
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil {
require.True(t, b.Parquet.Version == cortex_parquet.CurrentVersion)
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Wait for data to be queryable before running the projection hints tests
cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end)
if err != nil {
t.Logf("Series query failed: %v", err)
return false
}
return len(labelSets) > 0
})

testCases := []struct {
name string
query string
expectedLabels []string // Labels that should be present in result
}{
{
name: "vector selector query should not use projection",
query: `http_requests_total`,
expectedLabels: []string{"__name__", "job", "instance", "status_code", "method", "path", "cluster"},
},
{
name: "simple_sum_by_job",
query: `sum by (job) (http_requests_total)`,
expectedLabels: []string{"job"},
},
{
name: "rate_with_aggregation",
query: `sum by (method) (rate(http_requests_total[5m]))`,
expectedLabels: []string{"method"},
},
{
name: "multiple_grouping_labels",
query: `sum by (job, status_code) (http_requests_total)`,
expectedLabels: []string{"job", "status_code"},
},
{
name: "aggregation without query",
query: `sum without (instance, method) (http_requests_total)`,
expectedLabels: []string{"job", "status_code", "path", "cluster"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Testing: %s", tc.query)

// Execute instant query
result, err := c.Query(tc.query, end)
require.NoError(t, err)
require.NotNil(t, result)

// Verify we got results
vector, ok := result.(model.Vector)
require.True(t, ok, "result should be a vector")
require.NotEmpty(t, vector, "query should return results")

t.Logf("Query returned %d series", len(vector))

// Verify projection worked: series should only have the expected labels
for _, sample := range vector {
actualLabels := make(map[string]struct{})
for label := range sample.Metric {
actualLabels[string(label)] = struct{}{}
}

// Check that all expected labels are present
for _, expectedLabel := range tc.expectedLabels {
_, ok := actualLabels[expectedLabel]
require.True(t, ok,
"series should have %s label", expectedLabel)
}

// Check that no unexpected labels are present
for lbl := range actualLabels {
if !slices.Contains(tc.expectedLabels, lbl) {
require.Fail(t, "series should not have unexpected label: %s", lbl)
}
}
}
})
}

// Verify that parquet blocks were queried
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
2 changes: 1 addition & 1 deletion integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector3, result.(model.Vector))

if testCfg.bucketStorageType == "tedb" {
if testCfg.bucketStorageType == "tsdb" {
// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
Expand Down
21 changes: 21 additions & 0 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querysharding"
"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -147,6 +148,7 @@ func NewParquetQueryable(
}

parquetQueryableOpts := []queryable.QueryableOpts{
queryable.WithHonorProjectionHints(config.HonorProjectionHints),
queryable.WithRowCountLimitFunc(func(ctx context.Context) int64 {
// Ignore error as this shouldn't happen.
// If failed to resolve tenant we will just use the default limit value.
Expand Down Expand Up @@ -499,6 +501,14 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
sortSeries = true
}

// Reset projection hints if:
// - there are mixed blocks (both parquet and non-parquet)
// - not all parquet blocks have hash column (version < 2)
if len(remaining) > 0 || !allParquetBlocksHaveHashColumn(parquet) {
hints.ProjectionLabels = nil
hints.ProjectionInclude = false
}

promises := make([]chan storage.SeriesSet, 0, 2)

if len(parquet) > 0 {
Expand Down Expand Up @@ -598,6 +608,17 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining
}
}

// allParquetBlocksHaveHashColumn checks if all parquet blocks have version >= 2, which means they have the hash column.
// Parquet blocks with version 1 don't have the hash column, so projection cannot be enabled for them.
func allParquetBlocksHaveHashColumn(blocks []*bucketindex.Block) bool {
for _, b := range blocks {
if b.Parquet == nil || b.Parquet.Version < cortex_parquet.ParquetConverterMarkVersion2 {
return false
}
}
return true
}

type shardMatcherLabelsFilter struct {
shardMatcher *storepb.ShardMatcher
}
Expand Down
Loading
Loading