Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ async-compression = { git = "https://github.com/MaterializeInc/async-compression

# Custom iceberg features for mz
# All changes should go to the `mz_changes` branch.
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "f23586f12eea" }
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "f23586f12eea" }
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "4e0da31" }
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "4e0da31" }

# Custom duckdb crate to support mz needs
# All changes should go to the `mz_changes` branch.
Expand Down
578 changes: 578 additions & 0 deletions misc/python/materialize/checks/all_checks/iceberg_sink.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def schemas_null() -> str:


@externally_idempotent(False)
class SinkUpsert(Check):
class KafkaSinkUpsert(Check):
"""Basic Check on sinks from an upsert source"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -205,7 +205,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkTables(Check):
class KafkaSinkTables(Check):
"""Sink and re-ingest a large transaction from a table source"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -309,7 +309,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkNullDefaults(Check):
class KafkaSinkNullDefaults(Check):
"""Check on an Avro sink with NULL DEFAULTS"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -555,7 +555,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkComments(Check):
class KafkaSinkComments(Check):
"""Check on an Avro sink with comments"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -830,7 +830,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkAutoCreatedTopicConfig(Check):
class KafkaSinkAutoCreatedTopicConfig(Check):
"""Check on a sink with auto-created topic configuration"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -1443,7 +1443,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkFormat(Check):
class KafkaSinkFormat(Check):
"""Check SINK with KEY FORMAT and VALUE FORMAT"""

def initialize(self) -> Testdrive:
Expand Down Expand Up @@ -1506,7 +1506,7 @@ def validate(self) -> Testdrive:


@externally_idempotent(False)
class SinkPartitionByDebezium(Check):
class KafkaSinkPartitionByDebezium(Check):
"""Check SINK with ENVELOPE DEBEZIUM and PARTITION BY"""

def initialize(self) -> Testdrive:
Expand Down
24 changes: 24 additions & 0 deletions misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from materialize.checks.executors import Executor
from materialize.docker import image_registry
from materialize.mz_version import MzVersion
from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg
from materialize.mzcompose.services.clusterd import Clusterd
from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
from materialize.mzcompose.services.ssh_bastion_host import (
Expand Down Expand Up @@ -216,6 +217,29 @@ def join(self, e: Executor) -> None:
e.join(self.handle)


class SetupIcebergTesting(MzcomposeAction):
def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
self.handle: Any | None = None
self.mz_service = mz_service
self.scenario = scenario

def execute(self, e: Executor) -> None:
c = e.mzcompose_composition()
iceberg_credentials = setup_polaris_for_iceberg(c)
input = dedent(
f"""
> CREATE VIEW iceberg_credentials AS SELECT '{iceberg_credentials[0]}' AS user, '{iceberg_credentials[1]}' AS key;
> CREATE SECRET iceberg_secret AS '{iceberg_credentials[1]}'
> CREATE CONNECTION aws_conn TO AWS (ACCESS KEY ID = '{iceberg_credentials[0]}', SECRET ACCESS KEY = SECRET iceberg_secret, ENDPOINT = 'http://minio:9000/', REGION = 'us-east-1');
> CREATE CONNECTION polaris_conn TO ICEBERG CATALOG (CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', CREDENTIAL = 'root:root', WAREHOUSE = 'default_catalog', SCOPE = 'PRINCIPAL_ROLE:ALL');"""
)

self.handle = e.testdrive(input=input, mz_service=self.mz_service)

def join(self, e: Executor) -> None:
e.join(self.handle)


class KillMz(MzcomposeAction):
def __init__(
self, mz_service: str = "materialized", capture_logs: bool = False
Expand Down
9 changes: 9 additions & 0 deletions misc/python/materialize/checks/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ConfigureMz,
KillClusterdCompute,
KillMz,
SetupIcebergTesting,
SetupSqlServerTesting,
StartClusterdCompute,
StartMz,
Expand Down Expand Up @@ -90,6 +91,7 @@ def run(self) -> None:
actions.insert(0, ConfigureMz(self))

sql_server_testing_setup = False
iceberg_setup = self.base_version() < MzVersion.parse_mz("v26.10.0-dev")
for index, action in enumerate(actions):
# Implicitly call configure to raise version-dependent limits
if isinstance(action, StartMz) and not action.deploy_generation:
Expand All @@ -103,6 +105,13 @@ def run(self) -> None:
SetupSqlServerTesting(self, mz_service=action.mz_service),
)
sql_server_testing_setup = True
if not iceberg_setup:
# Can only be run once
actions.insert(
index + 1,
SetupIcebergTesting(self, mz_service=action.mz_service),
)
iceberg_setup = True
elif isinstance(action, ReplaceEnvironmentdStatefulSet):
actions.insert(index + 1, ConfigureMz(self))

Expand Down
69 changes: 48 additions & 21 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ def __init__(self, rng: random.Random, composition: Composition | None):
def run(self, exe: Executor) -> bool:
raise NotImplementedError

def create_system_connection(
self, exe: Executor, num_attempts: int = 10
) -> Connection:
try:
conn = psycopg.connect(
host=exe.db.host,
port=exe.db.ports[
"mz_system" if exe.mz_service == "materialized" else "mz_system2"
],
user="mz_system",
dbname="materialize",
)
conn.autocommit = True
return conn
except:
if num_attempts == 0:
raise
else:
time.sleep(1)
return self.create_system_connection(exe, num_attempts - 1)

def errors_to_ignore(self, exe: Executor) -> list[str]:
result = [
"permission denied for",
Expand Down Expand Up @@ -1752,27 +1773,6 @@ def run(self, exe: Executor) -> bool:
except Exception as e:
raise QueryError(str(e), "FlipFlags")

def create_system_connection(
self, exe: Executor, num_attempts: int = 10
) -> Connection:
try:
conn = psycopg.connect(
host=exe.db.host,
port=exe.db.ports[
"mz_system" if exe.mz_service == "materialized" else "mz_system2"
],
user="mz_system",
dbname="materialize",
)
conn.autocommit = True
return conn
except:
if num_attempts == 0:
raise
else:
time.sleep(1)
return self.create_system_connection(exe, num_attempts - 1)

def flip_flag(self, conn: Connection, flag_name: str, flag_value: str) -> None:
with conn.cursor() as cur:
cur.execute(
Expand Down Expand Up @@ -2864,6 +2864,32 @@ def run(self, exe: Executor) -> bool:
return True


class CheckSinkAction(Action):
def run(self, exe: Executor) -> bool:
try:
conn = self.create_system_connection(exe)
with conn.cursor() as cur:
cur.execute(
"SELECT name, type, last_status_change_at, status, error, details FROM mz_internal.mz_sink_statuses WHERE status not in ('running', 'starting', NULL)"
)
results = cur.fetchall()
if results:
results_str = "\n".join(
[
f"{name} ({sink_type}) changed status at {last_status_change_at} to {status}: {error} (details: {details})"
for name, sink_type, last_status_change_at, status, error, details in results
]
)
raise ValueError(f"Sinks are in a bad state:\n{results_str}")
except:
if exe.db.scenario not in (
Scenario.Kill,
Scenario.ZeroDowntimeDeploy,
):
raise
return True


class DropIcebergSinkAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
return [
Expand Down Expand Up @@ -3142,6 +3168,7 @@ def __init__(
(DropIcebergSinkAction, 4),
(CreateKafkaSourceAction, 4),
(DropKafkaSourceAction, 4),
(CheckSinkAction, 1),
# TODO: Reenable when database-issues#8237 is fixed
# (CreateMySqlSourceAction, 4),
# (DropMySqlSourceAction, 4),
Expand Down
49 changes: 40 additions & 9 deletions src/storage/src/metrics/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

use mz_ore::{
metric,
metrics::{DeleteOnDropCounter, DeleteOnDropGauge, IntCounterVec, UIntGaugeVec},
metrics::{
DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec, IntCounterVec,
UIntGaugeVec,
},
stats::histogram_seconds_buckets,
};
use mz_repr::GlobalId;
use prometheus::core::AtomicU64;
Expand All @@ -30,6 +34,10 @@ pub(crate) struct IcebergSinkMetricDefs {
pub commit_failures: IntCounterVec,
/// Commit conflicts in the iceberg sink.
pub commit_conflicts: IntCounterVec,
/// Time spent committing batches to Iceberg.
pub commit_duration_seconds: HistogramVec,
/// Time spent closing Iceberg DeltaWriters.
pub writer_close_duration_seconds: HistogramVec,
}

impl IcebergSinkMetricDefs {
Expand All @@ -41,40 +49,51 @@ impl IcebergSinkMetricDefs {
pub(crate) fn register_with(registry: &mz_ore::metrics::MetricsRegistry) -> Self {
Self {
data_files_written: registry.register(metric!(
name: "sink_iceberg_data_files_written",
name: "mz_sink_iceberg_data_files_written",
help: "Number of data files written by the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
delete_files_written: registry.register(metric!(
name: "sink_iceberg_delete_files_written",
name: "mz_sink_iceberg_delete_files_written",
help: "Number of delete files written by the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
stashed_rows: registry.register(metric!(
name: "sink_iceberg_stashed_rows",
name: "mz_sink_iceberg_stashed_rows",
help: "Number of stashed rows in the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
snapshots_committed: registry.register(metric!(
name: "sink_iceberg_snapshots_committed",
name: "mz_sink_iceberg_snapshots_committed",
help: "Number of snapshots committed by the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
commit_failures: registry.register(metric!(
name: "sink_iceberg_commit_failures",
name: "mz_sink_iceberg_commit_failures",
help: "Number of commit failures in the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
commit_conflicts: registry.register(metric!(
name: "sink_iceberg_commit_conflicts",
name: "mz_sink_iceberg_commit_conflicts",
help: "Number of commit conflicts in the iceberg sink",
var_labels: ["sink_id", "worker_id"]
)),
commit_duration_seconds: registry.register(metric!(
name: "mz_sink_iceberg_commit_duration_seconds",
help: "Time spent committing batches to Iceberg in seconds",
var_labels: ["sink_id", "worker_id"],
buckets: histogram_seconds_buckets(0.001, 32.0),
)),
writer_close_duration_seconds: registry.register(metric!(
name: "mz_sink_iceberg_writer_close_duration_seconds",
help: "Time spent closing Iceberg DeltaWriters in seconds",
var_labels: ["sink_id", "worker_id"],
buckets: histogram_seconds_buckets(0.001, 32.0),
)),
}
}
}

#[derive(Clone)]
pub(crate) struct IcebergSinkMetrics {
/// Number of data files written by the iceberg sink.
pub data_files_written: DeleteOnDropCounter<AtomicU64, Vec<String>>,
Expand All @@ -88,6 +107,10 @@ pub(crate) struct IcebergSinkMetrics {
pub commit_failures: DeleteOnDropCounter<AtomicU64, Vec<String>>,
/// Number of commit conflicts in the iceberg sink.
pub commit_conflicts: DeleteOnDropCounter<AtomicU64, Vec<String>>,
/// Time spent committing batches to Iceberg.
pub commit_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
/// Time spent closing Iceberg DeltaWriters.
pub writer_close_duration_seconds: DeleteOnDropHistogram<Vec<String>>,
}

impl IcebergSinkMetrics {
Expand All @@ -108,7 +131,15 @@ impl IcebergSinkMetrics {
commit_failures: defs
.commit_failures
.get_delete_on_drop_metric(labels.clone()),
commit_conflicts: defs.commit_conflicts.get_delete_on_drop_metric(labels),
commit_conflicts: defs
.commit_conflicts
.get_delete_on_drop_metric(labels.clone()),
commit_duration_seconds: defs
.commit_duration_seconds
.get_delete_on_drop_metric(labels.clone()),
writer_close_duration_seconds: defs
.writer_close_duration_seconds
.get_delete_on_drop_metric(labels),
}
}
}
Loading