Skip to content
Merged
57 changes: 42 additions & 15 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,22 @@ catalog_type,

The following settings are supported:

| Setting | Description |
|-------------------------|-----------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| Setting | Description |
|-------------------------|-----------------------------------------------------------------------------------------------|
| `catalog_type` | Type of catalog: `glue`, `unity` (Delta), `rest` (Iceberg), `hive`, `onelake` (Iceberg) |
| `warehouse` | The warehouse/database name to use in the catalog. |
| `catalog_credential` | Authentication credential for the catalog (e.g., API key or token) |
| `auth_header` | Custom HTTP header for authentication with the catalog service |
| `auth_scope` | OAuth2 scope for authentication (if using OAuth) |
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
| `dlf_access_key_id` | Access key ID for DLF access |
| `dlf_access_key_secret` | Access key Secret for DLF access |
| `namespaces` | Comma-separated list of namespaces, implemented for catalog types: `rest`, `glue` and `unity` |

## Examples {#examples}

Expand All @@ -83,4 +84,30 @@ SETTINGS
onelake_client_secret = client_secret;
SHOW TABLES IN databse_name;
SELECT count() from database_name.table_name;
```
```

## Namespace filter {#namespace}

By default, ClickHouse reads tables from all namespaces available in the catalog. You can limit this behavior using the `namespaces` database setting. The value should be a comma‑separated list of namespaces that are allowed to be read.

Supported catalog types are `rest`, `glue` and `unity`.

For example, if the catalog contains three namespaces - `dev`, `stage`, and `prod` - and you want to read data only from dev and stage, set:
```
namespaces='dev,stage'
```

### Nested namespaces {#namespace-nested}

The Iceberg (`rest`) catalog supports nested namespaces. The `namespaces` filter accepts the following patterns:

- `namespace` - includes tables from the specified namespace, but not from its nested namespaces.
- `namespace.nested` - includes tables from the nested namespace, but not from the parent.
- `namespace.*` - includes tables from all nested namespaces, but not from the parent.

If you need to include both a namespace and its nested namespaces, specify both explicitly. For example:
```
namespaces='namespace,namespace.*'
```

The default value is '*', which means all namespaces are included.
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@
M(763, SESSION_REFUSED) \
M(764, DEDUPLICATION_IS_NOT_POSSIBLE) \
M(765, UNKNOWN_MASKING_POLICY) \
M(766, CATALOG_NAMESPACE_DISABLED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
Expand Down
28 changes: 17 additions & 11 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString onelake_client_secret;
extern const DatabaseDataLakeSettingsString dlf_access_key_id;
extern const DatabaseDataLakeSettingsString dlf_access_key_secret;
extern const DatabaseDataLakeSettingsString namespaces;
extern const DatabaseDataLakeSettingsString google_project_id;
extern const DatabaseDataLakeSettingsString google_service_account;
extern const DatabaseDataLakeSettingsString google_metadata_service;
Expand Down Expand Up @@ -164,8 +165,9 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
.aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value,
.aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
.region = settings[DatabaseDataLakeSetting::region].value,
.namespaces = settings[DatabaseDataLakeSetting::namespaces].value,
.aws_role_arn = settings[DatabaseDataLakeSetting::aws_role_arn].value,
.aws_role_session_name = settings[DatabaseDataLakeSetting::aws_role_session_name].value,
.aws_role_session_name = settings[DatabaseDataLakeSetting::aws_role_session_name].value
};

switch (settings[DatabaseDataLakeSetting::catalog_type].value)
Expand All @@ -180,6 +182,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::auth_header],
settings[DatabaseDataLakeSetting::oauth_server_uri].value,
settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand All @@ -194,6 +197,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::auth_scope].value,
settings[DatabaseDataLakeSetting::oauth_server_uri].value,
settings[DatabaseDataLakeSetting::oauth_server_use_request_body].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand Down Expand Up @@ -263,6 +267,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
google_adc_client_secret,
google_adc_refresh_token,
google_adc_quota_project_id,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand All @@ -272,6 +277,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::catalog_credential].value,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
break;
}
Expand Down Expand Up @@ -368,24 +374,24 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_AZURE_BLOB_STORAGE
case DB::DatabaseDataLakeStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings);
return std::make_shared<StorageAzureIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_HDFS
case DB::DatabaseDataLakeStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings);
return std::make_shared<StorageHDFSIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -397,7 +403,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
default:
Expand All @@ -414,7 +420,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageS3DeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
#if USE_AZURE_BLOB_STORAGE
Expand All @@ -425,7 +431,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#endif
case DB::DatabaseDataLakeStorageType::Local:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
/// Fake storage in case when catalog store not only
/// primary-type tables (DeltaLake or Iceberg), but for
Expand All @@ -437,7 +443,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
/// dependencies and the most lightweight
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings);
return std::make_shared<StorageLocalDeltaLakeConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand All @@ -452,12 +458,12 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
#if USE_AWS_S3
case DB::DatabaseDataLakeStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings);
return std::make_shared<StorageS3IcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
#endif
case DB::DatabaseDataLakeStorageType::Other:
{
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings);
return std::make_shared<StorageLocalIcebergConfiguration>(storage_settings, settings[DatabaseDataLakeSetting::namespaces].value);
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace ErrorCodes
DECLARE(String, google_adc_credentials_file, "", "Path to JSON file containing Google Application Default Credentials. If set, credentials will be loaded from this file. File should contain: type, client_id, client_secret, refresh_token, and optionally quota_project_id", 0) \
DECLARE(String, dlf_access_key_id, "", "Access id of DLF token for Paimon REST Catalog", 0) \
DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \
DECLARE(String, namespaces, "*", "Comma-separated list of allowed namespaces", 0) \

#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \
Expand Down
29 changes: 26 additions & 3 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DATALAKE_DATABASE_ERROR;
extern const int CATALOG_NAMESPACE_DISABLED;
}

namespace DB::Setting
Expand Down Expand Up @@ -163,9 +164,9 @@ GlueCatalog::GlueCatalog(
LOG_TRACE(log, "Creating AWS glue client with credentials empty {}, region '{}', endpoint '{}'", credentials.IsEmpty(), region, endpoint);
}

boost::split(allowed_namespaces, settings.namespaces, boost::is_any_of(", "), boost::token_compress_on);
credentials_provider = DB::S3::getCredentialsProvider(poco_config, credentials, creds_config);
glue_client = std::make_unique<Aws::Glue::GlueClient>(credentials_provider, endpoint_provider, client_configuration);

}

GlueCatalog::~GlueCatalog() = default;
Expand All @@ -191,8 +192,9 @@ DataLake::ICatalog::Namespaces GlueCatalog::getDatabases(const std::string & pre
for (const auto & db : dbs)
{
const auto & db_name = db.GetName();
if (!db_name.starts_with(prefix))
if (!isNamespaceAllowed(db_name) || !db_name.starts_with(prefix))
continue;

result.push_back(db_name);
if (limit != 0 && result.size() >= limit)
break;
Expand Down Expand Up @@ -272,6 +274,9 @@ DB::Names GlueCatalog::getTables() const

bool GlueCatalog::existsTable(const std::string & database_name, const std::string & table_name) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I'd rather use a forward declaration of this error code instead of using the fully qualified name, but this is cosmetics.

(applies to all added throw statements below as well)


Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand All @@ -286,6 +291,9 @@ bool GlueCatalog::tryGetTableMetadata(
DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
if (!isNamespaceAllowed(database_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED, "Namespace {} is filtered by `namespaces` database parameter", database_name);

Aws::Glue::Model::GetTableRequest request;
request.SetDatabaseName(database_name);
request.SetName(table_name);
Expand Down Expand Up @@ -510,7 +518,7 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings, settings.namespaces);
configuration->initialize(args, getContext(), false);

auto object_storage = configuration->createObjectStorage(getContext(), true);
Expand Down Expand Up @@ -578,6 +586,11 @@ void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) cons

void GlueCatalog::createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*metadata_content*/) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED,
"Failed to create table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

createNamespaceIfNotExists(namespace_name);

Aws::Glue::Model::CreateTableRequest request;
Expand Down Expand Up @@ -650,6 +663,11 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t

void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const
{
if (!isNamespaceAllowed(namespace_name))
throw DB::Exception(DB::ErrorCodes::CATALOG_NAMESPACE_DISABLED,
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

Aws::Glue::Model::DeleteTableRequest request;
request.SetDatabaseName(namespace_name);
request.SetName(table_name);
Expand All @@ -663,6 +681,11 @@ void GlueCatalog::dropTable(const String & namespace_name, const String & table_
response.GetError().GetMessage());
}

bool GlueCatalog::isNamespaceAllowed(const std::string & namespace_) const
{
return allowed_namespaces.contains("*") || allowed_namespaces.contains(namespace_);
}

}

#endif
3 changes: 3 additions & 0 deletions src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
std::string region;
CatalogSettings settings;
DB::ASTPtr table_engine_definition;
std::unordered_set<std::string> allowed_namespaces;

bool isNamespaceAllowed(const std::string & namespace_) const;

DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct CatalogSettings
String aws_access_key_id;
String aws_secret_access_key;
String region;
String namespaces;
String aws_role_arn;
String aws_role_session_name;

Expand Down
Loading
Loading