Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 63 additions & 21 deletions src/Storages/IPartitionStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ extern const int BAD_ARGUMENTS;

namespace
{
using PartitionExpressionActionsAndColumnName = IPartitionStrategy::PartitionExpressionActionsAndColumnName;

/// Builds AST for hive partition path format
/// `partition_column_1=toString(partition_value_expr_1)/ ... /partition_column_N=toString(partition_value_expr_N)/`
/// for given partition columns list and a partition by AST.
Expand Down Expand Up @@ -89,6 +91,33 @@ namespace
return result;
}

ASTPtr buildToStringPartitionAST(ASTPtr partition_by)
{
ASTs arguments(1, partition_by);
return makeASTFunction("toString", std::move(arguments));
}

template <typename BuildAST>
PartitionExpressionActionsAndColumnName getCachedOrBuildActions(
const std::optional<PartitionExpressionActionsAndColumnName> & cached_result,
const IPartitionStrategy & partition_strategy,
BuildAST && build_ast)
{
if (cached_result)
return *cached_result;

auto expression_ast = build_ast();
return partition_strategy.getPartitionExpressionActions(expression_ast);
}

void cacheDeterministicActions(
std::optional<PartitionExpressionActionsAndColumnName> & cached_result,
const PartitionExpressionActionsAndColumnName & actions_with_column)
{
if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic())
cached_result = actions_with_column;
}

std::shared_ptr<IPartitionStrategy> createHivePartitionStrategy(
ASTPtr partition_by,
const Block & sample_block,
Expand Down Expand Up @@ -191,11 +220,8 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const
}

IPartitionStrategy::PartitionExpressionActionsAndColumnName
IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast)
IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) const
{
if (cached_result)
return *cached_result;

auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList());
auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false);

Expand All @@ -204,9 +230,6 @@ IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast)
std::move(actions_dag), ExpressionActionsSettings(context), false);
result.column_name = expression_ast->getColumnName();

if (!result.actions->getActionsDAG().hasNonDeterministic())
cached_result = result;

return result;
}

Expand Down Expand Up @@ -259,13 +282,19 @@ std::shared_ptr<IPartitionStrategy> PartitionStrategyFactory::get(StrategyType s
WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_)
: IPartitionStrategy(partition_key_description_, sample_block_, context_)
{
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });
cacheDeterministicActions(cached_result, actions_with_column);
}

ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk)
ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) const
{
ASTs arguments(1, partition_key_description.definition_ast);
ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments));
auto actions_with_column = getPartitionExpressionActions(partition_by_string);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });

Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(chunk.getColumns());
Expand All @@ -274,11 +303,13 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column.column_name).column;
}

ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block)
ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) const
{
ASTs arguments(1, partition_key_description.definition_ast);
ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments));
auto actions_with_column = getPartitionExpressionActions(partition_by_string);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });

actions_with_column.actions->execute(block);
return block.getByName(actions_with_column.column_name).column;
}
Expand All @@ -300,12 +331,20 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy(
}

block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set);

auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });
cacheDeterministicActions(cached_result, actions_with_column);
}

ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) const
{
auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns());
auto actions_with_column = getPartitionExpressionActions(hive_ast);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });

Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(chunk.getColumns());
Expand All @@ -314,10 +353,13 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column.column_name).column;
}

ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block)
ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) const
{
auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns());
auto actions_with_column = getPartitionExpressionActions(hive_ast);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });

actions_with_column.actions->execute(block);
return block.getByName(actions_with_column.column_name).column;
}
Expand Down
14 changes: 7 additions & 7 deletions src/Storages/IPartitionStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ struct IPartitionStrategy

virtual ~IPartitionStrategy() = default;

virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0;
virtual ColumnPtr computePartitionKey(const Chunk & chunk) const = 0;

virtual ColumnPtr computePartitionKey(Block & block) = 0;
virtual ColumnPtr computePartitionKey(Block & block) const = 0;

virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk)
{
Expand All @@ -48,7 +48,7 @@ struct IPartitionStrategy
NamesAndTypesList getPartitionColumns() const;
const KeyDescription & getPartitionKeyDescription() const;

PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast);
PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast) const;

protected:
const KeyDescription partition_key_description;
Expand Down Expand Up @@ -91,9 +91,9 @@ struct WildcardPartitionStrategy : IPartitionStrategy
{
WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
ColumnPtr computePartitionKey(const Chunk & chunk) const override;

ColumnPtr computePartitionKey(Block & block) override;
ColumnPtr computePartitionKey(Block & block) const override;
};

/*
Expand All @@ -110,9 +110,9 @@ struct HiveStylePartitionStrategy : IPartitionStrategy
const std::string & file_format_,
bool partition_columns_in_data_file_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
ColumnPtr computePartitionKey(const Chunk & chunk) const override;

ColumnPtr computePartitionKey(Block & block) override;
ColumnPtr computePartitionKey(Block & block) const override;

ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override;
Block getFormatHeader() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def test_drop_column_during_export_snapshot(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "mutations_snapshot_mt_table"
s3_table = "mutations_snapshot_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"mutations_snapshot_mt_table_{postfix}"
s3_table = f"mutations_snapshot_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -104,8 +106,10 @@ def test_add_column_during_export(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "add_column_during_export_mt_table"
s3_table = "add_column_during_export_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"add_column_during_export_mt_table_{postfix}"
s3_table = f"add_column_during_export_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -157,8 +161,10 @@ def test_pending_mutations_throw_before_export(cluster):
"""Test that pending mutations before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_throw_mt_table"
s3_table = "pending_mutations_throw_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_mutations_throw_mt_table_{postfix}"
s3_table = f"pending_mutations_throw_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand All @@ -180,8 +186,10 @@ def test_pending_mutations_skip_before_export(cluster):
"""Test that pending mutations before export are skipped with throw_on_pending_mutations=false."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_skip_mt_table"
s3_table = "pending_mutations_skip_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_mutations_skip_mt_table_{postfix}"
s3_table = f"pending_mutations_skip_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -210,8 +218,10 @@ def test_data_mutations_after_export_started(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "mutations_after_export_mt_table"
s3_table = "mutations_after_export_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"mutations_after_export_mt_table_{postfix}"
s3_table = f"mutations_after_export_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -256,8 +266,10 @@ def test_pending_patch_parts_throw_before_export(cluster):
"""Test that pending patch parts before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_patches_throw_mt_table"
s3_table = "pending_patches_throw_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_patches_throw_mt_table_{postfix}"
s3_table = f"pending_patches_throw_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand All @@ -279,8 +291,10 @@ def test_pending_patch_parts_skip_before_export(cluster):
"""Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false."""
node = cluster.instances["node1"]

mt_table = "pending_patches_skip_mt_table"
s3_table = "pending_patches_skip_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_patches_skip_mt_table_{postfix}"
s3_table = f"pending_patches_skip_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@ def wait_for_export_to_start(
raise TimeoutError(f"Export did not start within {timeout}s. ")


def wait_for_exception_count(
node,
mt_table: str,
s3_table: str,
partition_id: str,
min_exception_count: int = 1,
timeout: int = 30,
poll_interval: float = 0.5,
):
"""Wait for exception_count to reach at least min_exception_count."""
start_time = time.time()
last_exception_count = None
while time.time() - start_time < timeout:
exception_count_str = node.query(
f"""
SELECT exception_count FROM system.replicated_partition_exports
WHERE source_table = '{mt_table}'
AND destination_table = '{s3_table}'
AND partition_id = '{partition_id}'
"""
).strip()

if exception_count_str:
exception_count = int(exception_count_str)
last_exception_count = exception_count
if exception_count >= min_exception_count:
return exception_count

time.sleep(poll_interval)

raise TimeoutError(
f"Exception count did not reach {min_exception_count} within {timeout}s. "
f"Last exception_count: {last_exception_count if last_exception_count is not None else 'N/A'}"
)


def skip_if_remote_database_disk_enabled(cluster):
"""Skip test if any instance in the cluster has remote database disk enabled.

Expand Down Expand Up @@ -558,8 +594,8 @@ def test_inject_short_living_failures(cluster):
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_max_retries=100;"
)

# wait only for a second to get at least one failure, but not enough to finish the export
time.sleep(5)
# wait for at least one exception to occur, but not enough to finish the export
wait_for_exception_count(node, mt_table, s3_table, "2020", min_exception_count=1, timeout=30)

# wait for the export to finish
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---- Test max_bytes and max_rows per file
---- Table function with schema inheritance (no schema specified)
---- Table function with explicit compatible schema
Waiting for exports to complete (timeout: 60s)...
All exports completed.
---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit)
5
---- Count rows in big_table and big_destination_max_bytes
Expand Down
Loading
Loading