-
Notifications
You must be signed in to change notification settings - Fork 3k
Add ParquetFileMerger for efficient row-group level file merging #14435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
c42af16 to
6bfe165
Compare
## Why this change?
This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show **13x faster** file merging compared to traditional
read-rewrite approaches.
The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.
TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet
## What changed?
- Added ParquetFileMerger class for row-group level file merging
- Performs zero-copy merging using ParquetFileWriter.appendFile()
- Validates schema compatibility across all input files
- Supports merging multiple Parquet files into a single output file
- Reuses existing Apache Parquet library functionality instead of custom implementation
- Strict schema validation ensures data integrity during merge operations
- Added comprehensive error handling for schema mismatches
## Testing
- Validated in staging test environment
- Verified schema compatibility checks work correctly
- Confirmed 13x performance improvement over traditional approach
- Tested with various file sizes and row group configurations
7be3ef0 to
7f2d5b0
Compare
|
Thanks @shangxinli for the PR! At a high level, leveraging Parquet’s appendFile for row‑group merging is the right approach and a performance win. Making it opt‑in via an action option and a table property is appropriate. A couple of areas I’d like to discuss:
I’d also like to get others’ opinions. @pvary @amogh-jahagirdar @nastra @singhpk234 |
|
I have a few concerns here:
|
cae2d00 to
fa1d073
Compare
|
Thanks @huaxingao for the review and feedback! I've addressed both of your points.
Additionally, I've addressed the architectural feedback about planning vs. execution:
Let me know if there are any other concerns or improvements you'd like to see! |
|
Thanks @pvary for the detailed feedback! I've addressed your points:
|
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
8d6abd0 to
7a34353
Compare
api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java
Outdated
Show resolved
Hide resolved
| parquetOutputFile, | ||
| schema, | ||
| ParquetFileWriter.Mode.CREATE, | ||
| ParquetWriter.DEFAULT_BLOCK_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very sure about this part. Should it be directly fixed to ParquetWriter.DEFAULT_BLOCK_SIZE, or does it need to be linked with the table property write.parquet.row-group-size-bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel using the property gives more flexibility for rewriter. But like to hear other's thoughts.
|
Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted? |
Good question! The lineage information for v3 tables is preserved in two ways:
Field IDs are preserved because we strictly enforce identical schemas across all files being merged. In ParquetFileMerger.java:130-136, we validate that all input files have exactly the same Parquet MessageType schema: if (!schema.equals(currentSchema)) { Field IDs are stored directly in the Parquet schema structure itself (via Type.getId()), so when we copy row groups using ParquetFileWriter.appendFile() with the validated schema, all field IDs are preserved.
Row IDs are automatically assigned by Iceberg's commit framework - we don't need special handling in the merger. Here's how it works:
This is the same mechanism used by all Iceberg write operations, so row lineage is fully preserved for v3 tables. |
a32947b to
e6595f3
Compare
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFileMerger.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
Show resolved
Hide resolved
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
Outdated
Show resolved
Hide resolved
| verifyInitialVirtualRowIds(binpackTable); | ||
| long binpackCountBefore = currentData().size(); | ||
|
|
||
| RewriteDataFiles.Result binpackResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we split these tests based on useParquetFileMerger? Why do we cram USE_PARQUET_ROW_GROUP_MERGE true and false in the same test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to compare both approach have the same results. Added assume statement to let it run only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use useParquetFileMerger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can run them in separate test based on useParquetFileMerger and then check the correct merger is used, and then check that the outcome is correct. We don't need to compare the results between the 2 runs. We could just compare them to the expected result.
Do I miss something?
| public void testParquetFileMergerPreservesPhysicalRowIds() throws IOException { | ||
| // Test scenario 2: Tables with physical _row_id column | ||
| // After merging, the physical _row_id should be preserved (not changed) | ||
| assumeThat(formatVersion).isGreaterThanOrEqualTo(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this test run 2 times based on useParquetFileMerger? Shall we just use an assume and run only with useParquetFileMerger set to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if we decide it is worth to test for the useParquetFileMerger false case as well, we could just set USE_PARQUET_ROW_GROUP_MERGE based on useParquetFileMerger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use useParquetFileMerger
| // Merge using ParquetFileMerger - should handle all partitions correctly | ||
| RewriteDataFiles.Result result = | ||
| basicRewrite(table) | ||
| .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please think through all of the cases where we explicitly set USE_PARQUET_ROW_GROUP_MERGE. Should we set it based on useParquetFileMerger? If we don't want to set it based on useParquetFileMerger, do we need to run the tests 2 times with useParquetFileMerger true and false as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. let's use useParquetFileMerger
....0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
| // This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema | ||
| // but we're testing the sort order check that happens before calling ParquetFileMerger | ||
| // Since table has sort order, validation should fail early | ||
| if (table.sortOrder().isSorted()) { | ||
| // Should fail due to sort order | ||
| assertThat(true).isTrue(); | ||
| } else { | ||
| // If we got here, the sort order check didn't work | ||
| assertThat(false).isTrue(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this.
What do we test here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
....0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkParquetFileMergeRunner.java
Outdated
Show resolved
Hide resolved
....0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkParquetFileMergeRunner.java
Show resolved
Hide resolved
|
@shangxinli: Back from the holidays! I'm fairly happy with the production part of the code. Left a few small comments. I'm happy that @SourabhBadhya is checking the Sparks side. There are still a few questions wrt the tests. Please check them if you have time. Thanks, and happy new year 🎉 ! |
00df946 to
45b0197
Compare
1. Remove Spark-specific javadoc constraints from ParquetFileMerger
- Removed "Files must not have associated delete files" constraint
- Removed "Table must not have a sort order" constraint
- These validations are only enforced in SparkParquetFileMergeRunner,
not in the ParquetFileMerger class itself
2. Fix code style in TestParquetFileMerger
- Replace 'var' with explicit types (Parquet.DataWriteBuilder, DataWriter<Record>)
- Add newlines after for loop and try-catch blocks for better readability
- Remove unused Parquet import
3. Optimize test execution in TestRewriteDataFilesAction
- Add assumeThat for comparison tests to run once instead of twice
- Use String.valueOf(useParquetFileMerger) for regular tests to test both approaches
- Remove redundant testParquetFileMergerExplicitlyEnabledAndDisabled test
4. Fix TestSparkParquetFileMergeRunner to actually call canMergeAndGetSchema
- Changed canMergeAndGetSchema from private to package-private in SparkParquetFileMergeRunner
- Updated all tests to create runner instance and call canMergeAndGetSchema()
- Removed 4 trivial tests (description, inheritance, validOptions, init)
- All remaining tests now validate actual canMergeAndGetSchema behavior
|
|
||
| private static MessageType readSchema(InputFile inputFile) throws IOException { | ||
| try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { | ||
| return reader.getFooter().getFileMetaData().getSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use reader.getFileMetaData().getSchema().
| try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) { | ||
| // Read metadata from the first file | ||
| if (extraMetadata == null) { | ||
| extraMetadata = reader.getFooter().getFileMetaData().getKeyValueMetaData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use reader.getFileMetaData().getKeyValueMetaData()
| shouldHaveFiles(table, 4); | ||
|
|
||
| // Test that binPack() respects the configuration option | ||
| // When enabled, should use SparkParquetFileMergeRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we check that the correct merger is used?
| // Add more data to create additional files | ||
| writeRecords(2, SCALE); | ||
| shouldHaveFiles(table, dataFilesAfterFirstMerge.size() + 2); | ||
|
|
||
| long countBefore = currentData().size(); | ||
|
|
||
| // Second merge: should preserve physical row IDs via binary copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this testing a different feature? If so, this could be a separate test.
The first test is for generating the _row_ids, the second is copying the _row_ids.
Do I understand the intention correctly?
|
|
||
| @TestTemplate | ||
| public void testRowLineageWithPartitionedTable() throws IOException { | ||
| // Test that row lineage preservation works correctly with partitioned tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are descriptions of what we are testing. Should be a comment for the test method and not a comment in the first line of the test method.
Please check and fix in the other case as well
|
|
||
| @TestTemplate | ||
| public void testRowLineageWithLargerScale() throws IOException { | ||
| // Test row lineage preservation with larger number of files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this test important in a unit test?
Do we have multiple groups, and we are checking that we are able to handle multiple groups correctly?
If so, we should check that we actually had multiple groups
|
|
||
| @TestTemplate | ||
| public void testRowLineageConsistencyAcrossMultipleMerges() throws IOException { | ||
| // Test that row lineage (row IDs) are preserved across multiple merge operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is enough to test:
- Generation of the _row_id and related columns
- Copy of the _row_id and related columns
- Copy of the data columns if _row_id is not needed
This kind of test seems like a duplication which uses time and resources for limited coverage gain
| when(parquetFile1.format()).thenReturn(FileFormat.PARQUET); | ||
| when(parquetFile1.specId()).thenReturn(0); | ||
| when(parquetFile1.fileSizeInBytes()).thenReturn(100L); | ||
| when(parquetFile1.path()).thenReturn(tableLocation + "/data/file1.parquet"); | ||
| when(group.rewrittenFiles()).thenReturn(Sets.newHashSet(parquetFile1)); | ||
| when(group.expectedOutputFiles()).thenReturn(1); | ||
| when(group.maxOutputFileSize()).thenReturn(Long.MAX_VALUE); | ||
| when(group.fileScanTasks()).thenReturn(Collections.emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like a question:
- In other tests we create Data files, and Tasks instead of mocks? We have several occasions where we had issues with mocks, so we try to avoid them, or limit the place where we use them as much as possible.
Maybe something like FileGenerationUtil.generateDataFile, or TestBase.FILE_A for files?
Also for the tasks we might be able to use MockFileScanTask.mockTask?
We could also create real RewriteFileGroup objects instead of mocks
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeAndGetSchemaReturnsFalseForSortedTable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be testCanMergeAndGetSchemaReturnsNullForSortedTable? Notice Null instead of False.
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeAndGetSchemaReturnsFalseForFilesWithDeleteFiles() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be testCanMergeAndGetSchemaReturnsNullForFilesWithDeleteFiles ? Notice Null instead of False.
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeAndGetSchemaReturnsFalseForTableWithMultipleColumnSort() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be testCanMergeAndGetSchemaReturnsNullForTableWithMultipleColumnSort ? Notice Null instead of False.
| // Note: ParquetFileMerger.canMergeAndGetSchema would return null because | ||
| // mock files don't exist on disk, but the initial validation checks all pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we need another check, or we should create the files in the tests
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeReturnsFalseForNonParquetFile() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the tests, so ReturnsNull instead of ReturnsFalse
|
|
||
| @Test | ||
| public void testMergeFilesWithoutRowLineage() throws IOException { | ||
| // Test that merging without firstRowIds/dataSequenceNumbers works (no row lineage columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These comments should be a method comment instead of a "random" comment in the beginning of the file
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeReturnsFalseForPhysicalRowLineageWithNulls() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this test next to the other testCanMerge tests? And don't forget to change False to Null
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeReturnsFalseForSameFieldNameDifferentType() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again move this test to the other testCanMergeReturns tests
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| public class TestParquetFileMerger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The test classes don't have to be public. Make all of them package private
| } | ||
|
|
||
| @Test | ||
| public void testCanMergeThrowsForEmptyList() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please organize the test methods a bit.
Move every testCanMerge to one place, and every testMerge after.
Why this change?
This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show 10x faster file merging compared to traditional
read-rewrite approaches.
The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.
TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet
What changed?
Testing