Skip to content

MERGE INTO over Iceberg temp-view source can fail with assertion failed: No plan for TableReference[...] #56515

Description

@mrjoe7

Spark version

  • 4.1.2

Iceberg version

  • 1.11.0
  • Spark runtime package: org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0

Repro

Self-contained repro:

from __future__ import annotations

import os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Path

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T


def main() -> None:
    warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
    ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
    os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
    os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")

    spark = (
        SparkSession.builder.master("local[2]")
        .appName("repro-no-plan-for-tablereference")
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
        .config("spark.jars.ivy", str(ivy_dir))
        .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.defaultCatalog", "iceberg")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.sql.catalog.iceberg.type", "hadoop")
        .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )

    table_name = "iceberg.temporal.no_plan_for_tablereference"
    row_schema = T.StructType(
        [
            T.StructField("asset_id", T.StringType(), False),
            T.StructField("state_valid_from", T.DateType(), False),
            T.StructField("state_valid_to", T.DateType(), False),
            T.StructField("customer_name", T.StringType(), True),
            T.StructField("state_hash", T.StringType(), False),
            T.StructField("generated_at", T.TimestampType(), False),
        ]
    )

    try:
        spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
        spark.sql(
            f"""
            CREATE TABLE {table_name} (
                asset_id STRING NOT NULL,
                state_valid_from DATE NOT NULL,
                state_valid_to DATE NOT NULL,
                customer_name STRING,
                state_hash STRING NOT NULL,
                generated_at TIMESTAMP NOT NULL
            )
            USING iceberg
            TBLPROPERTIES ('format-version' = '3')
            """
        )

        spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", "hash-keep", datetime(2024, 2, 1)),
                ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", "hash-delete", datetime(2024, 2, 11)),
            ],
            schema=row_schema,
        ).writeTo(table_name).append()

        replacement_state_df = spark.createDataFrame(
            [
                ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", "hash-updated", datetime(2030, 1, 1)),
            ],
            schema=row_schema,
        )

        join_columns = ["asset_id", "state_valid_from", "state_valid_to"]
        target_columns = row_schema.fieldNames()
        merge_source_df = (
            spark.table(table_name)
            .alias("target")
            .join(replacement_state_df.alias("source"), on=join_columns, how="full_outer")
            .select(
                F.coalesce(F.col("source.asset_id"), F.col("target.asset_id")).alias("asset_id"),
                F.coalesce(F.col("source.state_valid_from"), F.col("target.state_valid_from")).alias(
                    "state_valid_from"
                ),
                F.coalesce(F.col("source.state_valid_to"), F.col("target.state_valid_to")).alias("state_valid_to"),
                *[F.col(f"source.{column}").alias(column) for column in target_columns if column not in join_columns],
                F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
                .when(F.col("target.asset_id").isNull(), F.lit("insert"))
                .when(F.col("source.state_hash") == F.col("target.state_hash"), F.lit("noop"))
                .otherwise(F.lit("update"))
                .alias("_merge_operation"),
            )
        )
        merge_source_df.createOrReplaceTempView("merge_source")

        spark.sql(
            f"""
            MERGE INTO {table_name} target
            USING merge_source source
            ON target.asset_id = source.asset_id
               AND target.state_valid_from = source.state_valid_from
               AND target.state_valid_to = source.state_valid_to
            WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
            WHEN MATCHED AND source._merge_operation = 'update' THEN
                UPDATE SET
                    target.asset_id = source.asset_id,
                    target.state_valid_from = source.state_valid_from,
                    target.state_valid_to = source.state_valid_to,
                    target.customer_name = source.customer_name,
                    target.state_hash = source.state_hash,
                    target.generated_at = source.generated_at
            WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
                INSERT (asset_id, state_valid_from, state_valid_to, customer_name, state_hash, generated_at)
                VALUES (
                    source.asset_id,
                    source.state_valid_from,
                    source.state_valid_to,
                    source.customer_name,
                    source.state_hash,
                    source.generated_at
                )
            """
        )
    finally:
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
        spark.stop()
        shutil.rmtree(warehouse_dir, ignore_errors=True)
        shutil.rmtree(ivy_dir, ignore_errors=True)


if __name__ == "__main__":
    main()

Expected

MERGE INTO should complete successfully.

It should not fail with an internal planner assertion.

Actual

Spark fails during planning with an internal error:

org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase planning failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., customer_name#..., state_hash#..., generated_at#...] iceberg.temporal.no_plan_for_tablereference
...
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)

Workaround:

Downgrade to spark 4.0.3 solved the issue for me.

Notes

  • The failure reproduces consistently with local Spark 4.1.2
  • The reduced shape seems to be:
    • MERGE INTO Iceberg table
    • USING temp view
    • temp view lineage reads from the same target Iceberg table
    • planner later trips on TableReference[...]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions