from __future__ import annotations
import os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def main() -> None:
warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")
spark = (
SparkSession.builder.master("local[2]")
.appName("repro-no-plan-for-tablereference")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
.config("spark.jars.ivy", str(ivy_dir))
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.sql.catalog.iceberg.type", "hadoop")
.config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
table_name = "iceberg.temporal.no_plan_for_tablereference"
row_schema = T.StructType(
[
T.StructField("asset_id", T.StringType(), False),
T.StructField("state_valid_from", T.DateType(), False),
T.StructField("state_valid_to", T.DateType(), False),
T.StructField("customer_name", T.StringType(), True),
T.StructField("state_hash", T.StringType(), False),
T.StructField("generated_at", T.TimestampType(), False),
]
)
try:
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
spark.sql(
f"""
CREATE TABLE {table_name} (
asset_id STRING NOT NULL,
state_valid_from DATE NOT NULL,
state_valid_to DATE NOT NULL,
customer_name STRING,
state_hash STRING NOT NULL,
generated_at TIMESTAMP NOT NULL
)
USING iceberg
TBLPROPERTIES ('format-version' = '3')
"""
)
spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", "hash-keep", datetime(2024, 2, 1)),
("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", "hash-delete", datetime(2024, 2, 11)),
],
schema=row_schema,
).writeTo(table_name).append()
replacement_state_df = spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", "hash-updated", datetime(2030, 1, 1)),
],
schema=row_schema,
)
join_columns = ["asset_id", "state_valid_from", "state_valid_to"]
target_columns = row_schema.fieldNames()
merge_source_df = (
spark.table(table_name)
.alias("target")
.join(replacement_state_df.alias("source"), on=join_columns, how="full_outer")
.select(
F.coalesce(F.col("source.asset_id"), F.col("target.asset_id")).alias("asset_id"),
F.coalesce(F.col("source.state_valid_from"), F.col("target.state_valid_from")).alias(
"state_valid_from"
),
F.coalesce(F.col("source.state_valid_to"), F.col("target.state_valid_to")).alias("state_valid_to"),
*[F.col(f"source.{column}").alias(column) for column in target_columns if column not in join_columns],
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
.when(F.col("target.asset_id").isNull(), F.lit("insert"))
.when(F.col("source.state_hash") == F.col("target.state_hash"), F.lit("noop"))
.otherwise(F.lit("update"))
.alias("_merge_operation"),
)
)
merge_source_df.createOrReplaceTempView("merge_source")
spark.sql(
f"""
MERGE INTO {table_name} target
USING merge_source source
ON target.asset_id = source.asset_id
AND target.state_valid_from = source.state_valid_from
AND target.state_valid_to = source.state_valid_to
WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
WHEN MATCHED AND source._merge_operation = 'update' THEN
UPDATE SET
target.asset_id = source.asset_id,
target.state_valid_from = source.state_valid_from,
target.state_valid_to = source.state_valid_to,
target.customer_name = source.customer_name,
target.state_hash = source.state_hash,
target.generated_at = source.generated_at
WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
INSERT (asset_id, state_valid_from, state_valid_to, customer_name, state_hash, generated_at)
VALUES (
source.asset_id,
source.state_valid_from,
source.state_valid_to,
source.customer_name,
source.state_hash,
source.generated_at
)
"""
)
finally:
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
spark.stop()
shutil.rmtree(warehouse_dir, ignore_errors=True)
shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
main()
It should not fail with an internal planner assertion.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., customer_name#..., state_hash#..., generated_at#...] iceberg.temporal.no_plan_for_tablereference
...
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)
Downgrade to spark 4.0.3 solved the issue for me.
Spark version
Iceberg version
Repro
Self-contained repro:
Expected
MERGE INTOshould complete successfully.It should not fail with an internal planner assertion.
Actual
Spark fails during planning with an internal error:
Workaround:
Downgrade to spark 4.0.3 solved the issue for me.
Notes