Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion docs/en/sql-reference/table-functions/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ y: 993

### Schema evolution {#iceberg-writes-schema-evolution}

ClickHouse allows you to add, drop, or modify columns with simple types (non-tuple, non-array, non-map).
ClickHouse allows you to add, drop, modify, or rename columns with simple types (non-tuple, non-array, non-map).

### Example {#example-iceberg-writes-evolution}

Expand Down Expand Up @@ -479,6 +479,27 @@ Row 1:
──────
x: Ivanov
y: 993

ALTER TABLE iceberg_writes_example RENAME COLUMN y TO value;
SHOW CREATE TABLE iceberg_writes_example;

┌─statement─────────────────────────────────────────────────┐
1. │ CREATE TABLE default.iceberg_writes_example ↴│
│↳( ↴│
│↳ `x` Nullable(String), ↴│
│↳ `value` Nullable(Int64) ↴│
│↳) ↴│
│↳ENGINE = IcebergLocal('/home/scanhex12/iceberg_example/') │
└───────────────────────────────────────────────────────────┘

SELECT *
FROM iceberg_writes_example
FORMAT VERTICAL;

Row 1:
──────
x: Ivanov
value: 993
```

### Compaction {#iceberg-writes-compaction}
Expand Down
116 changes: 95 additions & 21 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <IO/Operators.h>
#include <Interpreters/Context.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -42,6 +43,8 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>


namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -114,6 +117,15 @@ String encodeNamespaceForURI(const String & namespace_name)
return encoded;
}

Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}

}

std::string RestCatalog::Config::toString() const
Expand Down Expand Up @@ -1305,43 +1317,105 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
if (!new_snapshot)
return true;

if (new_snapshot->has(DB::Iceberg::f_schemas))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
return false;

const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
if (!new_schema_obj)
return false;

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
{
Poco::JSON::Array::Ptr identifier_fields = new Poco::JSON::Array;
schema_for_rest->set("identifier-field-ids", identifier_fields);
}

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
set_current_schema->set("schema-id", new_schema_id);
updates->add(set_current_schema);
}
request_body->set("updates", updates);
}
else
{
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

updates->add(set_snapshot);
{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
request_body->set("updates", updates);
}

try
Expand Down
15 changes: 8 additions & 7 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl

void checkAlterIsPossible(const AlterCommands & commands) override
{
assertInitializedDL();
current_metadata->checkAlterIsPossible(commands);
if(current_metadata)
current_metadata->checkAlterIsPossible(commands);
}

void alter(const AlterCommands & params, ContextPtr context) override
void alter(const AlterCommands & params, ContextPtr context,
const StorageID & storage_id, std::shared_ptr<DataLake::ICatalog> catalog) override
{
assertInitializedDL();
current_metadata->alter(params, context);

current_metadata->alter(params, shared_from_this(), context, storage_id, catalog);
}

ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override
Expand Down Expand Up @@ -451,7 +451,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl

void assertInitializedDL() const
{
BaseStorageConfiguration::assertInitialized();
if (!current_metadata)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized");
}
Expand Down Expand Up @@ -675,7 +674,9 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu

void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); }

void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); }
void alter(const AlterCommands & params, ContextPtr context,
const StorageID & storage_id, std::shared_ptr<DataLake::ICatalog> catalog) override
{ getImpl().alter(params, context, storage_id, catalog); }

const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); }

Expand Down
7 changes: 6 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ class IDataLakeMetadata : boost::noncopyable

virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional<FormatSettings> &, FormatParserSharedResourcesPtr, ContextPtr) const { }
virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); }
virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); }
virtual void alter(
const AlterCommands & /*params*/,
StorageObjectStorageConfigurationPtr /*configuration*/,
ContextPtr /*context*/,
const StorageID & /*storage_id*/,
std::shared_ptr<DataLake::ICatalog> /*catalog*/) { throwNotImplemented("alter"); }
virtual void drop(ContextPtr) { }

virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
Expand Down
15 changes: 12 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,12 +717,17 @@ void IcebergMetadata::checkAlterIsPossible(const AlterCommands & commands)
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::DROP_COLUMN
&& command.type != AlterCommand::Type::MODIFY_COLUMN)
&& command.type != AlterCommand::Type::MODIFY_COLUMN && command.type != AlterCommand::Type::RENAME_COLUMN)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by Iceberg storage", command.type);
}
}

void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context)
void IcebergMetadata::alter(
const AlterCommands & params,
StorageObjectStorageConfigurationPtr configuration,
ContextPtr context,
const StorageID & storage_id,
std::shared_ptr<DataLake::ICatalog> catalog)
{
if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value)
{
Expand All @@ -732,7 +737,11 @@ void IcebergMetadata::alter(const AlterCommands & params, ContextPtr context)
"To allow its usage, enable setting allow_experimental_insert_into_iceberg");
}

Iceberg::alter(params, context, object_storage, data_lake_settings, persistent_components, write_format);
Iceberg::alter(
params, context, object_storage, data_lake_settings, persistent_components, write_format,
storage_id, catalog,
configuration ? configuration->getTypeName() : "",
configuration ? configuration->getNamespace() : "");
}

void IcebergMetadata::createInitial(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ class IcebergMetadata : public IDataLakeMetadata
void modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const override;
void addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional<FormatSettings> & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, ContextPtr local_context) const override;
void checkAlterIsPossible(const AlterCommands & commands) override;
void alter(const AlterCommands & params, ContextPtr context) override;
void alter(
const AlterCommands & params,
StorageObjectStorageConfigurationPtr configuration,
ContextPtr context,
const StorageID & storage_id,
std::shared_ptr<DataLake::ICatalog> catalog) override;

ObjectIterator iterate(
const ActionsDAG * filter_dag,
Expand Down
47 changes: 47 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,53 @@ void MetadataGenerator::generateModifyColumnMetadata(const String & column_name,
metadata_object->getArray(Iceberg::f_schemas)->add(current_schema);
}

void MetadataGenerator::generateRenameColumnMetadata(const String & column_name, const String & new_column_name)
{
auto current_schema_id = metadata_object->getValue<Int32>(Iceberg::f_current_schema_id);

Poco::JSON::Object::Ptr current_schema;
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
if (schemas->getObject(i)->getValue<Int32>(Iceberg::f_schema_id) == current_schema_id)
{
current_schema = schemas->getObject(i);
break;
}
}

if (!current_schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found schema with id {}", current_schema_id);
current_schema = deepCopy(current_schema);

auto schema_fields = current_schema->getArray(Iceberg::f_fields);

for (UInt32 i = 0; i < schema_fields->size(); ++i)
{
if (schema_fields->getObject(i)->getValue<String>(Iceberg::f_name) == new_column_name)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Column {} already exists", new_column_name);
}

bool found = false;
for (UInt32 i = 0; i < schema_fields->size(); ++i)
{
auto current_field = schema_fields->getObject(i);
if (current_field->getValue<String>(Iceberg::f_name) == column_name)
{
current_field->set(Iceberg::f_name, new_column_name);
found = true;
break;
}
}

if (!found)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found column {}", column_name);

metadata_object->set(Iceberg::f_current_schema_id, current_schema_id + 1);
current_schema->set(Iceberg::f_schema_id, current_schema_id + 1);
metadata_object->getArray(Iceberg::f_schemas)->add(current_schema);
}

}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MetadataGenerator
void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
void generateDropColumnMetadata(const String & column_name);
void generateModifyColumnMetadata(const String & column_name, DataTypePtr type);
void generateRenameColumnMetadata(const String & column_name, const String & new_column_name);

private:
Poco::JSON::Object::Ptr metadata_object;
Expand Down
Loading
Loading