From 96f796d37221af5d127cab15cfe2060bddef9ac7 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 12 Mar 2026 14:30:07 -0300 Subject: [PATCH 1/8] add mutex around cached_result --- src/Storages/IPartitionStrategy.cpp | 14 ++++++++++++-- src/Storages/IPartitionStrategy.h | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 8a4ac6f12df2..bcb92330b21a 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -193,8 +193,11 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const IPartitionStrategy::PartitionExpressionActionsAndColumnName IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) { - if (cached_result) - return *cached_result; + { + std::lock_guard lock(cached_result_mutex); + if (cached_result) + return *cached_result; + } auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList()); auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false); @@ -205,7 +208,14 @@ IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) result.column_name = expression_ast->getColumnName(); if (!result.actions->getActionsDAG().hasNonDeterministic()) + { + std::lock_guard lock(cached_result_mutex); + + if (cached_result) + return *cached_result; + cached_result = result; + } return result; } diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index 91397de2362d..1e621f79b8e0 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -55,6 +56,7 @@ struct IPartitionStrategy const Block sample_block; ContextPtr context; + std::mutex cached_result_mutex; std::optional cached_result; }; From 9ee7fd06bab12cb0f1a42f6a2e561b763cfeb3c1 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 12 Mar 2026 15:56:04 -0300 Subject: [PATCH 2/8] change --- src/Storages/IPartitionStrategy.cpp | 40 ++++++++++++++++------------- src/Storages/IPartitionStrategy.h | 2 -- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index bcb92330b21a..d7ca30d0fff4 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -193,12 +193,6 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const IPartitionStrategy::PartitionExpressionActionsAndColumnName IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) { - { - std::lock_guard lock(cached_result_mutex); - if (cached_result) - return *cached_result; - } - auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList()); auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false); @@ -207,16 +201,6 @@ IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) std::move(actions_dag), ExpressionActionsSettings(context), false); result.column_name = expression_ast->getColumnName(); - if (!result.actions->getActionsDAG().hasNonDeterministic()) - { - std::lock_guard lock(cached_result_mutex); - - if (cached_result) - return *cached_result; - - cached_result = result; - } - return result; } @@ -269,6 +253,15 @@ std::shared_ptr PartitionStrategyFactory::get(StrategyType s WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_) : IPartitionStrategy(partition_key_description_, sample_block_, context_) { + ASTs arguments(1, partition_key_description.definition_ast); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + auto actions_with_column = getPartitionExpressionActions(partition_by_string); + + /// if we can cache it, do it + if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) + { + cached_result = actions_with_column; + } } ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) @@ -310,12 +303,22 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( } block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); + + ASTs arguments(1, partition_key_description.definition_ast); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + auto actions_with_column = getPartitionExpressionActions(partition_by_string); + + /// if we can cache it, do it + if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) + { + cached_result = actions_with_column; + } } ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) { auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = getPartitionExpressionActions(hive_ast); + auto actions_with_column = cached_result.value_or(getPartitionExpressionActions(hive_ast)); Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -327,7 +330,8 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) { auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = getPartitionExpressionActions(hive_ast); + auto actions_with_column = cached_result.value_or(getPartitionExpressionActions(hive_ast)); + actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; } diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index 1e621f79b8e0..91397de2362d 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace DB { @@ -56,7 +55,6 @@ struct IPartitionStrategy const Block sample_block; ContextPtr context; - std::mutex cached_result_mutex; std::optional cached_result; }; From 1a9bfa1620c43367da85df2a5b973fba3a8d129e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 13 Mar 2026 09:57:06 -0300 Subject: [PATCH 3/8] real fix --- src/Storages/IPartitionStrategy.cpp | 63 +++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index d7ca30d0fff4..af88f2cc7d2b 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -266,9 +266,18 @@ WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_ke ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); + PartitionExpressionActionsAndColumnName actions_with_column; + + if (cached_result) + { + actions_with_column = cached_result.value(); + } + else + { + ASTs arguments(1, partition_key_description.definition_ast); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + actions_with_column = getPartitionExpressionActions(partition_by_string); + } Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -279,9 +288,19 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); + PartitionExpressionActionsAndColumnName actions_with_column; + + if (cached_result) + { + actions_with_column = cached_result.value(); + } + else + { + ASTs arguments(1, partition_key_description.definition_ast); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + actions_with_column = getPartitionExpressionActions(partition_by_string); + } + actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; } @@ -304,9 +323,8 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); + auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); + auto actions_with_column = getPartitionExpressionActions(hive_ast); /// if we can cache it, do it if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) @@ -317,8 +335,18 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = cached_result.value_or(getPartitionExpressionActions(hive_ast)); + + PartitionExpressionActionsAndColumnName actions_with_column; + + if (cached_result) + { + actions_with_column = cached_result.value(); + } + else + { + auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); + actions_with_column = getPartitionExpressionActions(hive_ast); + } Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -329,8 +357,17 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = cached_result.value_or(getPartitionExpressionActions(hive_ast)); + PartitionExpressionActionsAndColumnName actions_with_column; + + if (cached_result) + { + actions_with_column = cached_result.value(); + } + else + { + auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); + actions_with_column = getPartitionExpressionActions(hive_ast); + } actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; From 06beea0c866c528a46c91e044bc455f11362df80 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 13 Mar 2026 10:42:07 -0300 Subject: [PATCH 4/8] reduce code duplication --- src/Storages/IPartitionStrategy.cpp | 129 +++++++++++++--------------- src/Storages/IPartitionStrategy.h | 14 +-- 2 files changed, 67 insertions(+), 76 deletions(-) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index af88f2cc7d2b..02f310a9e23b 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -22,6 +22,8 @@ extern const int BAD_ARGUMENTS; namespace { + using PartitionExpressionActionsAndColumnName = IPartitionStrategy::PartitionExpressionActionsAndColumnName; + /// Builds AST for hive partition path format /// `partition_column_1=toString(partition_value_expr_1)/ ... /partition_column_N=toString(partition_value_expr_N)/` /// for given partition columns list and a partition by AST. @@ -89,6 +91,33 @@ namespace return result; } + ASTPtr buildToStringPartitionAST(ASTPtr partition_by) + { + ASTs arguments(1, partition_by); + return makeASTFunction("toString", std::move(arguments)); + } + + template + PartitionExpressionActionsAndColumnName getCachedOrBuildActions( + const std::optional & cached_result, + const IPartitionStrategy & partition_strategy, + BuildAST && build_ast) + { + if (cached_result) + return *cached_result; + + auto expression_ast = build_ast(); + return partition_strategy.getPartitionExpressionActions(expression_ast); + } + + void cacheDeterministicActions( + std::optional & cached_result, + const PartitionExpressionActionsAndColumnName & actions_with_column) + { + if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) + cached_result = actions_with_column; + } + std::shared_ptr createHivePartitionStrategy( ASTPtr partition_by, const Block & sample_block, @@ -191,7 +220,7 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const } IPartitionStrategy::PartitionExpressionActionsAndColumnName -IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) +IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) const { auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList()); auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false); @@ -253,31 +282,19 @@ std::shared_ptr PartitionStrategyFactory::get(StrategyType s WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_) : IPartitionStrategy(partition_key_description_, sample_block_, context_) { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); - - /// if we can cache it, do it - if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) - { - cached_result = actions_with_column; - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); + cacheDeterministicActions(cached_result, actions_with_column); } -ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) +ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) const { - PartitionExpressionActionsAndColumnName actions_with_column; - - if (cached_result) - { - actions_with_column = cached_result.value(); - } - else - { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - actions_with_column = getPartitionExpressionActions(partition_by_string); - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -286,20 +303,12 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column.column_name).column; } -ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) +ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) const { - PartitionExpressionActionsAndColumnName actions_with_column; - - if (cached_result) - { - actions_with_column = cached_result.value(); - } - else - { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - actions_with_column = getPartitionExpressionActions(partition_by_string); - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; @@ -323,30 +332,19 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = getPartitionExpressionActions(hive_ast); - - /// if we can cache it, do it - if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) - { - cached_result = actions_with_column; - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); + cacheDeterministicActions(cached_result, actions_with_column); } -ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) +ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) const { - - PartitionExpressionActionsAndColumnName actions_with_column; - - if (cached_result) - { - actions_with_column = cached_result.value(); - } - else - { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - actions_with_column = getPartitionExpressionActions(hive_ast); - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -355,19 +353,12 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column.column_name).column; } -ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) +ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) const { - PartitionExpressionActionsAndColumnName actions_with_column; - - if (cached_result) - { - actions_with_column = cached_result.value(); - } - else - { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - actions_with_column = getPartitionExpressionActions(hive_ast); - } + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index 91397de2362d..1378762c6911 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -27,9 +27,9 @@ struct IPartitionStrategy virtual ~IPartitionStrategy() = default; - virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; + virtual ColumnPtr computePartitionKey(const Chunk & chunk) const = 0; - virtual ColumnPtr computePartitionKey(Block & block) = 0; + virtual ColumnPtr computePartitionKey(Block & block) const = 0; virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) { @@ -48,7 +48,7 @@ struct IPartitionStrategy NamesAndTypesList getPartitionColumns() const; const KeyDescription & getPartitionKeyDescription() const; - PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast); + PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast) const; protected: const KeyDescription partition_key_description; @@ -91,9 +91,9 @@ struct WildcardPartitionStrategy : IPartitionStrategy { WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_); - ColumnPtr computePartitionKey(const Chunk & chunk) override; + ColumnPtr computePartitionKey(const Chunk & chunk) const override; - ColumnPtr computePartitionKey(Block & block) override; + ColumnPtr computePartitionKey(Block & block) const override; }; /* @@ -110,9 +110,9 @@ struct HiveStylePartitionStrategy : IPartitionStrategy const std::string & file_format_, bool partition_columns_in_data_file_); - ColumnPtr computePartitionKey(const Chunk & chunk) override; + ColumnPtr computePartitionKey(const Chunk & chunk) const override; - ColumnPtr computePartitionKey(Block & block) override; + ColumnPtr computePartitionKey(Block & block) const override; ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; Block getFormatHeader() override; From 8f40492e0757a1d14566716efbb6baa9d4bd125f Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 13 Mar 2026 14:36:49 -0300 Subject: [PATCH 5/8] poll --- ..._part_limits_and_table_functions.reference | 2 ++ ...ge_tree_part_limits_and_table_functions.sh | 31 +++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference index ce4f112ad1fa..b7f1f4411bf6 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference @@ -1,6 +1,8 @@ ---- Test max_bytes and max_rows per file ---- Table function with schema inheritance (no schema specified) ---- Table function with explicit compatible schema +Waiting for exports to complete (timeout: 60s)... +All exports completed. ---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit) 5 ---- Count rows in big_table and big_destination_max_bytes diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh index 449720bbf7a3..16c86569f82b 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh @@ -55,8 +55,35 @@ query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3 echo "---- Table function with explicit compatible schema" query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" -# ONE BIG SLEEP after all exports (longer because it writes multiple files) -sleep 20 +# Wait for all exports to complete +wait_for_exports() { + local timeout=${1:-60} + local poll_interval=${2:-0.5} + local start_time=$(date +%s) + local elapsed=0 + + echo "Waiting for exports to complete (timeout: ${timeout}s)..." + + while [ $elapsed -lt $timeout ]; do + # Check if any exports are still in progress for our tables/parts + local active_exports=$(query "SELECT count() FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" | tr -d '\n') + + if [ "$active_exports" = "0" ]; then + echo "All exports completed." + return 0 + fi + + sleep $poll_interval + elapsed=$(($(date +%s) - start_time)) + done + + echo "Timeout waiting for exports to complete after ${timeout}s" + echo "Remaining exports:" + query "SELECT source_table, part_name, elapsed, rows_read, total_rows_to_read FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" + return 1 +} + +wait_for_exports 60 # ============================================================================ # ALL SELECTS/VERIFICATIONS HAPPEN HERE From c4290b178c05af5b2e64984d5b22aaca1501102f Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Fri, 13 Mar 2026 14:46:14 -0300 Subject: [PATCH 6/8] naming issue for flaky check --- .../test.py | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py index 1539cb130598..8adec2908d1c 100644 --- a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py +++ b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py @@ -52,8 +52,10 @@ def test_drop_column_during_export_snapshot(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "mutations_snapshot_mt_table" - s3_table = "mutations_snapshot_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"mutations_snapshot_mt_table_{postfix}" + s3_table = f"mutations_snapshot_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -104,8 +106,10 @@ def test_add_column_during_export(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "add_column_during_export_mt_table" - s3_table = "add_column_during_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"add_column_during_export_mt_table_{postfix}" + s3_table = f"add_column_during_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -157,8 +161,10 @@ def test_pending_mutations_throw_before_export(cluster): """Test that pending mutations before export throw an error with default settings.""" node = cluster.instances["node1"] - mt_table = "pending_mutations_throw_mt_table" - s3_table = "pending_mutations_throw_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_mutations_throw_mt_table_{postfix}" + s3_table = f"pending_mutations_throw_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -180,8 +186,10 @@ def test_pending_mutations_skip_before_export(cluster): """Test that pending mutations before export are skipped with throw_on_pending_mutations=false.""" node = cluster.instances["node1"] - mt_table = "pending_mutations_skip_mt_table" - s3_table = "pending_mutations_skip_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_mutations_skip_mt_table_{postfix}" + s3_table = f"pending_mutations_skip_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -210,8 +218,10 @@ def test_data_mutations_after_export_started(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "mutations_after_export_mt_table" - s3_table = "mutations_after_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"mutations_after_export_mt_table_{postfix}" + s3_table = f"mutations_after_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -256,8 +266,10 @@ def test_pending_patch_parts_throw_before_export(cluster): """Test that pending patch parts before export throw an error with default settings.""" node = cluster.instances["node1"] - mt_table = "pending_patches_throw_mt_table" - s3_table = "pending_patches_throw_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_patches_throw_mt_table_{postfix}" + s3_table = f"pending_patches_throw_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -279,8 +291,10 @@ def test_pending_patch_parts_skip_before_export(cluster): """Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false.""" node = cluster.instances["node1"] - mt_table = "pending_patches_skip_mt_table" - s3_table = "pending_patches_skip_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_patches_skip_mt_table_{postfix}" + s3_table = f"pending_patches_skip_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) From ebe2ad1130b9f2a7bb9f4547fed2c39613970831 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 16 Mar 2026 14:33:48 -0300 Subject: [PATCH 7/8] try to fix yet another test --- .../test.py | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 832739da290a..f3902c82ab22 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -69,6 +69,42 @@ def wait_for_export_to_start( raise TimeoutError(f"Export did not start within {timeout}s. ") +def wait_for_exception_count( + node, + mt_table: str, + s3_table: str, + partition_id: str, + min_exception_count: int = 1, + timeout: int = 30, + poll_interval: float = 0.5, +): + """Wait for exception_count to reach at least min_exception_count.""" + start_time = time.time() + last_exception_count = None + while time.time() - start_time < timeout: + exception_count_str = node.query( + f""" + SELECT exception_count FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '{partition_id}' + """ + ).strip() + + if exception_count_str: + exception_count = int(exception_count_str) + last_exception_count = exception_count + if exception_count >= min_exception_count: + return exception_count + + time.sleep(poll_interval) + + raise TimeoutError( + f"Exception count did not reach {min_exception_count} within {timeout}s. " + f"Last exception_count: {last_exception_count if last_exception_count is not None else 'N/A'}" + ) + + def skip_if_remote_database_disk_enabled(cluster): """Skip test if any instance in the cluster has remote database disk enabled. @@ -558,8 +594,8 @@ def test_inject_short_living_failures(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_max_retries=100;" ) - # wait only for a second to get at least one failure, but not enough to finish the export - time.sleep(5) + # wait for at least one exception to occur, but not enough to finish the export + wait_for_exception_count(node, mt_table, s3_table, "2020", min_exception_count=1, timeout=30) # wait for the export to finish wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") From c1d0f747746a05801ad58493cdf431dfbcd77264 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 17 Mar 2026 11:01:30 -0300 Subject: [PATCH 8/8] poll system.part_log instead of system.exports --- ...ge_tree_part_limits_and_table_functions.sh | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh index 16c86569f82b..dff7332662d0 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh @@ -14,7 +14,14 @@ tf_schema_explicit="tf_schema_explicit_${RANDOM}" mt_table_tf="mt_table_tf_${RANDOM}" query() { - $CLICKHOUSE_CLIENT --query "$1" + local query_text="$1" + local query_id="$2" + + if [ -n "$query_id" ]; then + $CLICKHOUSE_CLIENT --query_id="$query_id" --query "$query_text" + else + $CLICKHOUSE_CLIENT --query "$query_text" + fi } query "DROP TABLE IF EXISTS $big_table, $big_destination_max_bytes, $big_destination_max_rows, $mt_table_tf" @@ -44,16 +51,22 @@ big_part_max_rows=$(query "SELECT name FROM system.parts WHERE database = curren # ALL EXPORTS HAPPEN HERE # ============================================================================ +# Generate unique query_ids for each export to track them in part_log +export_query_id_1="export_${RANDOM}_1" +export_query_id_2="export_${RANDOM}_2" +export_query_id_3="export_${RANDOM}_3" +export_query_id_4="export_${RANDOM}_4" + # this should generate ~4 files -query "ALTER TABLE $big_table EXPORT PART '$big_part_max_bytes' TO TABLE $big_destination_max_bytes SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file=3500000, output_format_parquet_row_group_size_bytes=1000000" +query "ALTER TABLE $big_table EXPORT PART '$big_part_max_bytes' TO TABLE $big_destination_max_bytes SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file=3500000, output_format_parquet_row_group_size_bytes=1000000" "$export_query_id_1" # export_merge_tree_part_max_rows_per_file = 1048576 (which is 4194304/4) to generate 4 files -query "ALTER TABLE $big_table EXPORT PART '$big_part_max_rows' TO TABLE $big_destination_max_rows SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_rows_per_file=1048576" +query "ALTER TABLE $big_table EXPORT PART '$big_part_max_rows' TO TABLE $big_destination_max_rows SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_rows_per_file=1048576" "$export_query_id_2" echo "---- Table function with schema inheritance (no schema specified)" -query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_inherit', format='Parquet', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_inherit', format='Parquet', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" "$export_query_id_3" echo "---- Table function with explicit compatible schema" -query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" "$export_query_id_4" # Wait for all exports to complete wait_for_exports() { @@ -65,10 +78,14 @@ wait_for_exports() { echo "Waiting for exports to complete (timeout: ${timeout}s)..." while [ $elapsed -lt $timeout ]; do - # Check if any exports are still in progress for our tables/parts - local active_exports=$(query "SELECT count() FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" | tr -d '\n') + # Flush logs to ensure part_log entries are visible + query "SYSTEM FLUSH LOGS" > /dev/null 2>&1 || true + + # Wait for part_log entries - these are written synchronously when export completes + # Check if all expected exports have corresponding part_log entries by query_id + local completed_count=$(query "SELECT count() FROM system.part_log WHERE event_type = 'ExportPart' AND query_id IN ('$export_query_id_1', '$export_query_id_2', '$export_query_id_3', '$export_query_id_4')" | tr -d '\n') - if [ "$active_exports" = "0" ]; then + if [ "$completed_count" = "4" ]; then echo "All exports completed." return 0 fi @@ -78,8 +95,11 @@ wait_for_exports() { done echo "Timeout waiting for exports to complete after ${timeout}s" - echo "Remaining exports:" - query "SELECT source_table, part_name, elapsed, rows_read, total_rows_to_read FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" + query "SYSTEM FLUSH LOGS" > /dev/null 2>&1 || true + echo "Completed exports in part_log:" + query "SELECT query_id, table, part_name, event_time FROM system.part_log WHERE event_type = 'ExportPart' AND query_id IN ('$export_query_id_1', '$export_query_id_2', '$export_query_id_3', '$export_query_id_4')" + echo "Remaining exports in system.exports:" + query "SELECT source_table, part_name, elapsed, rows_read, total_rows_to_read FROM system.exports WHERE ((source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0')))" return 1 }