-
Notifications
You must be signed in to change notification settings - Fork 474
[lake/iceberg] support nested row types for Iceberg tiering #2278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[lake/iceberg] support nested row types for Iceberg tiering #2278
Conversation
|
I will add more test cases for nested row support after #2266 is merged to avoid conflicts in the overlapping test class. |
8d351d1 to
ac215c8
Compare
|
@luoyuxia PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for nested row types in Fluss Iceberg tiering, enabling complex nested data structures to be converted between Fluss and Iceberg formats. The implementation handles both simple and deeply nested row structures, including rows containing arrays.
Key changes:
- Implemented bidirectional conversion between Fluss RowType and Iceberg StructType
- Added support for reading nested rows from Iceberg records back to Fluss rows
- Extended test coverage for nested row scenarios across primary key and log tables
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| FlussDataTypeToIcebergDataType.java | Implements conversion from Fluss RowType to Iceberg StructType with proper field handling |
| IcebergConversions.java | Adds reverse conversion from Iceberg StructType to Fluss RowType |
| IcebergRecordAsFlussRow.java | Implements getRow method to convert nested Iceberg records to Fluss rows |
| FlussRowAsIcebergRecord.java | Adds converter for nested row fields to handle Fluss to Iceberg conversion |
| IcebergRecordAsFlussRowTest.java | Tests conversion of nested Iceberg records to Fluss rows |
| FlussRowAsIcebergRecordTest.java | Tests conversion of nested Fluss rows to Iceberg records |
| FlinkUnionReadPrimaryKeyTableITCase.java | Integration tests for nested row data in primary key tables |
| FlinkUnionReadLogTableITCase.java | Integration tests for nested row data in log tables |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| return row -> { | ||
| InternalRow nestedRow = row.getRow(pos, rowType.getFieldCount()); | ||
| return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow); |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential null pointer exception if nestedRow is null. The constructor call should handle null values, but the lambda doesn't check before creating the FlussRowAsIcebergRecord. Consider returning null when nestedRow is null to match the pattern used in the array converter.
| return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow); | |
| return nestedRow == null | |
| ? null | |
| : new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow); |
| } else if (icebergType.isStructType()) { | ||
| Types.StructType structType = icebergType.asStructType(); | ||
| List<DataField> fields = new ArrayList<>(); | ||
| for (Types.NestedField nestedField : structType.fields()) { | ||
| DataType fieldType = convertIcebergTypeToFlussType(nestedField.type()); | ||
| fields.add(new DataField(nestedField.name(), fieldType)); | ||
| } | ||
| return DataTypes.ROW(fields.toArray(new DataField[0])); | ||
| } |
Copilot
AI
Jan 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The struct type handling is added within a sequence of if-else statements but the closing brace at line 139 appears disconnected from the opening at line 133. The control flow suggests this else-if block should close at line 141, but line 139 has a closing brace that doesn't align with the logic structure. Verify the brace placement is correct.
Purpose
Linked issue: close #2253 (comment)
Support nested row types for Iceberg tiering
Brief change log
Added support for nested row type conversion in Fluss Iceberg tiering.
Tests
FlussRowAsIcebergRecordTest.testNestedRow: Verifies accurate conversion of Fluss InternalRow containing nested fields to Iceberg Record.FlinkUnionReadPrimaryKeyTableITCase: Validates that nested row data in Primary Key tables is correctly tiered to Iceberg and readable via Flink.FlinkUnionReadLogTableITCase: Validates that nested row data in Log tables is correctly tiered to Iceberg and readable via Flink, including partitioned scenarios.API and Format
Documentation