Skip to content

Conversation

@shangxinli
Copy link
Contributor

@shangxinli shangxinli commented Oct 28, 2025

Why this change?

This implementation provides significant performance improvements for Parquet
file merging operations by eliminating serialization/deserialization overhead.
Benchmark results show 10x faster file merging compared to traditional
read-rewrite approaches.

The change leverages existing Parquet library capabilities (ParquetFileWriter
appendFile API) to perform zero-copy row-group merging, making it ideal for
compaction and maintenance operations on large Iceberg tables.

TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet

What changed?

  • Added ParquetFileMerger class for row-group level file merging
    • Performs zero-copy merging using ParquetFileWriter.appendFile()
    • Validates schema compatibility across all input files
    • Supports merging multiple Parquet files into a single output file
  • Reuses existing Apache Parquet library functionality instead of custom implementation
  • Strict schema validation ensures data integrity during merge operations
  • Added comprehensive error handling for schema mismatches

Testing

  • Validated in staging test environment
  • Verified schema compatibility checks work correctly
  • Confirmed 13x performance improvement over traditional approach
  • Tested with various file sizes and row group configurations

  ## Why this change?

  This implementation provides significant performance improvements for Parquet
  file merging operations by eliminating serialization/deserialization overhead.
  Benchmark results show **13x faster** file merging compared to traditional
  read-rewrite approaches.

  The change leverages existing Parquet library capabilities (ParquetFileWriter
  appendFile API) to perform zero-copy row-group merging, making it ideal for
  compaction and maintenance operations on large Iceberg tables.

  TODO: 1) Encrypted tables are not supported yet. 2) Schema evolution is not handled yet

  ## What changed?

  - Added ParquetFileMerger class for row-group level file merging
    - Performs zero-copy merging using ParquetFileWriter.appendFile()
    - Validates schema compatibility across all input files
    - Supports merging multiple Parquet files into a single output file
  - Reuses existing Apache Parquet library functionality instead of custom implementation
  - Strict schema validation ensures data integrity during merge operations
  - Added comprehensive error handling for schema mismatches

  ## Testing

  - Validated in staging test environment
  - Verified schema compatibility checks work correctly
  - Confirmed 13x performance improvement over traditional approach
  - Tested with various file sizes and row group configurations
@huaxingao
Copy link
Contributor

Thanks @shangxinli for the PR! At a high level, leveraging Parquet’s appendFile for row‑group merging is the right approach and a performance win. Making it opt‑in via an action option and a table property is appropriate.

A couple of areas I’d like to discuss:

  • IO integration: Would it make sense to route IO through table.io()/OutputFileFactory rather than Hadoop IO?
  • Executor/driver split: Should executors only write files and return locations/sizes, with DataFiles (and metrics) constructed on the driver?

I’d also like to get others’ opinions. @pvary @amogh-jahagirdar @nastra @singhpk234

@pvary
Copy link
Contributor

pvary commented Nov 3, 2025

I have a few concerns here:

  • I would prefer if the decision to do the row-group level merging is done on the action level, and not leaked to the table properties
  • I would prefer to check the requirements as soon as possible and fail, or fall back with logging to the normal rewrite if the requirements are not met
  • In the planning we can create groups with the expected sizes, and in this case the runner could rewrite the whole groups, and don't need to split the planned groups to the expected file sizes
  • Always using HadoopFileIO could be problematic. The catalog might define a different FileIO implementation. We should handle the case correctly, and use the Catalog/Table provided FileIo
  • We don't reuse the ParquetFileMerger object. In this case, I usually prefer to use static methods.

@shangxinli
Copy link
Contributor Author

Thanks @huaxingao for the review and feedback! I've addressed both of your points.

  1. IO integration:
    Good catch! I've updated the implementation to use table.io() instead of hardcoding HadoopFileIO. The new approach:
  • Executors still use Hadoop Configuration for the actual Parquet file merging (since ParquetFileMerger internally uses Parquet's appendFile which requires Hadoop APIs)
  • Driver now uses table.io().newInputFile() to read metrics, which properly respects the catalog's configured FileIO implementation
  • This ensures compatibility with different storage systems (S3, GCS, Azure, custom FileIO implementations)
  1. Executor/driver split:
    I've refactored to follow the recommended pattern:
  • Executors: Only perform the file merge operation and return lightweight metadata (file path, size) via a MergeResult object
  • Driver: Receives the metadata, reads metrics using table.io(), and constructs the full DataFile objects
  • This minimizes serialization overhead and keeps heavyweight objects on the driver side

Additionally, I've addressed the architectural feedback about planning vs. execution:

  • Removed the groupFilesBySize() logic from the runner - the planner already creates appropriately-sized groups
  • Runner now merges the entire file group into a single output file without further splitting
  • This creates a cleaner separation where planning happens in the planner and execution happens in the runner

Let me know if there are any other concerns or improvements you'd like to see!

@shangxinli
Copy link
Contributor Author

Thanks @pvary for the detailed feedback! I've addressed your points:

  1. Decision at action level, not table properties:
    Done - Removed the PARQUET_USE_FILE_MERGER from TableProperties entirely and stripped out the fallback logic. Now it only checks the action options.

  2. Early validation with proper fallback:
    Done - Flipped the logic around to validate upfront with canUseMerger() before attempting the merge. Also beefed up the validation to actually check schema compatibility, not just file format. If anything fails, it logs and falls back to the standard rewrite.

  3. Planning creates expected sizes, runner doesn't split:
    Done - Nuked the whole groupFilesBySize() method. The runner now just merges whatever the planner gave it into a single file - no more re-grouping.

  4. Use Catalog/Table FileIO instead of HadoopFileIO:
    Done - Removed HadoopFileIO completely. Executors now just return path + size, and the driver reads metrics using table().io() which respects whatever FileIO the catalog configured.

  5. Static methods instead of object creation:
    Done - Converted ParquetFileMerger to a utility class with private constructor and all static methods. No more new ParquetFileMerger() calls anywhere.

parquetOutputFile,
schema,
ParquetFileWriter.Mode.CREATE,
ParquetWriter.DEFAULT_BLOCK_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure about this part. Should it be directly fixed to ParquetWriter.DEFAULT_BLOCK_SIZE, or does it need to be linked with the table property write.parquet.row-group-size-bytes?

Copy link
Contributor Author

@shangxinli shangxinli Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel using the property gives more flexibility for rewriter. But like to hear other's thoughts.

@Guosmilesmile
Copy link
Contributor

Guosmilesmile commented Nov 12, 2025

Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted?

@shangxinli
Copy link
Contributor Author

Thanks for the PR ! I have a question about the lineage, If the merging is only performed at the parquet layer, will the lineage information of the v3 table be disrupted?

Good question! The lineage information for v3 tables is preserved in two ways:

  1. Field IDs (Schema Lineage)

Field IDs are preserved because we strictly enforce identical schemas across all files being merged.

In ParquetFileMerger.java:130-136, we validate that all input files have exactly the same Parquet MessageType schema:

if (!schema.equals(currentSchema)) {
throw new IllegalArgumentException(
String.format("Schema mismatch detected: file '%s' has schema %s but file '%s' has schema %s. "
+ "All files must have identical Parquet schemas for row-group level merging.", ...));
}

Field IDs are stored directly in the Parquet schema structure itself (via Type.getId()), so when we copy row groups using ParquetFileWriter.appendFile() with the validated schema, all field IDs are preserved.

  1. Row IDs (Row Lineage for v3+)

Row IDs are automatically assigned by Iceberg's commit framework - we don't need special handling in the merger.

Here's how it works:

  1. Our code creates DataFile objects with metrics (including recordCount) but without firstRowId - see SparkParquetFileMergeRunner.java:236-243
  2. During commit, SnapshotProducer creates a ManifestListWriter initialized with base.nextRowId() (the table's current row ID counter) - see SnapshotProducer.java:273
  3. ManifestListWriter.prepare() automatically assigns firstRowId to each manifest and increments the counter by the number of rows - see ManifestListWriter.java:136-140:
    // assign first-row-id and update the next to assign
    wrapper.wrap(manifest, nextRowId);
    this.nextRowId += manifest.existingRowsCount() + manifest.addedRowsCount();
  4. The snapshot is committed with the updated nextRowId, ensuring all row IDs are correctly tracked

This is the same mechanism used by all Iceberg write operations, so row lineage is fully preserved for v3 tables.

verifyInitialVirtualRowIds(binpackTable);
long binpackCountBefore = currentData().size();

RewriteDataFiles.Result binpackResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we split these tests based on useParquetFileMerger? Why do we cram USE_PARQUET_ROW_GROUP_MERGE true and false in the same test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to compare both approach have the same results. Added assume statement to let it run only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use useParquetFileMerger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can run them in separate test based on useParquetFileMerger and then check the correct merger is used, and then check that the outcome is correct. We don't need to compare the results between the 2 runs. We could just compare them to the expected result.

Do I miss something?

public void testParquetFileMergerPreservesPhysicalRowIds() throws IOException {
// Test scenario 2: Tables with physical _row_id column
// After merging, the physical _row_id should be preserved (not changed)
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this test run 2 times based on useParquetFileMerger? Shall we just use an assume and run only with useParquetFileMerger set to true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if we decide it is worth to test for the useParquetFileMerger false case as well, we could just set USE_PARQUET_ROW_GROUP_MERGE based on useParquetFileMerger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use useParquetFileMerger

// Merge using ParquetFileMerger - should handle all partitions correctly
RewriteDataFiles.Result result =
basicRewrite(table)
.option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please think through all of the cases where we explicitly set USE_PARQUET_ROW_GROUP_MERGE. Should we set it based on useParquetFileMerger? If we don't want to set it based on useParquetFileMerger, do we need to run the tests 2 times with useParquetFileMerger true and false as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. let's use useParquetFileMerger

Comment on lines 124 to 133
// This validation would normally be done in SparkParquetFileMergeRunner.canMergeAndGetSchema
// but we're testing the sort order check that happens before calling ParquetFileMerger
// Since table has sort order, validation should fail early
if (table.sortOrder().isSorted()) {
// Should fail due to sort order
assertThat(true).isTrue();
} else {
// If we got here, the sort order check didn't work
assertThat(false).isTrue();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this.
What do we test here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

@pvary
Copy link
Contributor

pvary commented Jan 6, 2026

@shangxinli: Back from the holidays! I'm fairly happy with the production part of the code. Left a few small comments. I'm happy that @SourabhBadhya is checking the Sparks side.

There are still a few questions wrt the tests. Please check them if you have time.

Thanks, and happy new year 🎉 !

@shangxinli shangxinli force-pushed the rewrite_data_files2 branch from 00df946 to 45b0197 Compare January 7, 2026 03:06
1. Remove Spark-specific javadoc constraints from ParquetFileMerger
   - Removed "Files must not have associated delete files" constraint
   - Removed "Table must not have a sort order" constraint
   - These validations are only enforced in SparkParquetFileMergeRunner,
     not in the ParquetFileMerger class itself

2. Fix code style in TestParquetFileMerger
   - Replace 'var' with explicit types (Parquet.DataWriteBuilder, DataWriter<Record>)
   - Add newlines after for loop and try-catch blocks for better readability
   - Remove unused Parquet import

3. Optimize test execution in TestRewriteDataFilesAction
   - Add assumeThat for comparison tests to run once instead of twice
   - Use String.valueOf(useParquetFileMerger) for regular tests to test both approaches
   - Remove redundant testParquetFileMergerExplicitlyEnabledAndDisabled test

4. Fix TestSparkParquetFileMergeRunner to actually call canMergeAndGetSchema
   - Changed canMergeAndGetSchema from private to package-private in SparkParquetFileMergeRunner
   - Updated all tests to create runner instance and call canMergeAndGetSchema()
   - Removed 4 trivial tests (description, inheritance, validOptions, init)
   - All remaining tests now validate actual canMergeAndGetSchema behavior

private static MessageType readSchema(InputFile inputFile) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) {
return reader.getFooter().getFileMetaData().getSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use reader.getFileMetaData().getSchema().

try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inputFile))) {
// Read metadata from the first file
if (extraMetadata == null) {
extraMetadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
Copy link
Contributor

@SourabhBadhya SourabhBadhya Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use reader.getFileMetaData().getKeyValueMetaData()

shouldHaveFiles(table, 4);

// Test that binPack() respects the configuration option
// When enabled, should use SparkParquetFileMergeRunner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we check that the correct merger is used?

Comment on lines +2857 to +2863
// Add more data to create additional files
writeRecords(2, SCALE);
shouldHaveFiles(table, dataFilesAfterFirstMerge.size() + 2);

long countBefore = currentData().size();

// Second merge: should preserve physical row IDs via binary copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this testing a different feature? If so, this could be a separate test.
The first test is for generating the _row_ids, the second is copying the _row_ids.

Do I understand the intention correctly?


@TestTemplate
public void testRowLineageWithPartitionedTable() throws IOException {
// Test that row lineage preservation works correctly with partitioned tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are descriptions of what we are testing. Should be a comment for the test method and not a comment in the first line of the test method.
Please check and fix in the other case as well


@TestTemplate
public void testRowLineageWithLargerScale() throws IOException {
// Test row lineage preservation with larger number of files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test important in a unit test?
Do we have multiple groups, and we are checking that we are able to handle multiple groups correctly?
If so, we should check that we actually had multiple groups


@TestTemplate
public void testRowLineageConsistencyAcrossMultipleMerges() throws IOException {
// Test that row lineage (row IDs) are preserved across multiple merge operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is enough to test:

  • Generation of the _row_id and related columns
  • Copy of the _row_id and related columns
  • Copy of the data columns if _row_id is not needed

This kind of test seems like a duplication which uses time and resources for limited coverage gain

Comment on lines +79 to +86
when(parquetFile1.format()).thenReturn(FileFormat.PARQUET);
when(parquetFile1.specId()).thenReturn(0);
when(parquetFile1.fileSizeInBytes()).thenReturn(100L);
when(parquetFile1.path()).thenReturn(tableLocation + "/data/file1.parquet");
when(group.rewrittenFiles()).thenReturn(Sets.newHashSet(parquetFile1));
when(group.expectedOutputFiles()).thenReturn(1);
when(group.maxOutputFileSize()).thenReturn(Long.MAX_VALUE);
when(group.fileScanTasks()).thenReturn(Collections.emptyList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a question:

  • In other tests we create Data files, and Tasks instead of mocks? We have several occasions where we had issues with mocks, so we try to avoid them, or limit the place where we use them as much as possible.

Maybe something like FileGenerationUtil.generateDataFile, or TestBase.FILE_A for files?
Also for the tasks we might be able to use MockFileScanTask.mockTask?
We could also create real RewriteFileGroup objects instead of mocks

}

@Test
public void testCanMergeAndGetSchemaReturnsFalseForSortedTable() {
Copy link
Contributor

@pvary pvary Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be testCanMergeAndGetSchemaReturnsNullForSortedTable? Notice Null instead of False.

}

@Test
public void testCanMergeAndGetSchemaReturnsFalseForFilesWithDeleteFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be testCanMergeAndGetSchemaReturnsNullForFilesWithDeleteFiles ? Notice Null instead of False.

}

@Test
public void testCanMergeAndGetSchemaReturnsFalseForTableWithMultipleColumnSort() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be testCanMergeAndGetSchemaReturnsNullForTableWithMultipleColumnSort ? Notice Null instead of False.

Comment on lines +158 to +159
// Note: ParquetFileMerger.canMergeAndGetSchema would return null because
// mock files don't exist on disk, but the initial validation checks all pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need another check, or we should create the files in the tests

}

@Test
public void testCanMergeReturnsFalseForNonParquetFile() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the tests, so ReturnsNull instead of ReturnsFalse


@Test
public void testMergeFilesWithoutRowLineage() throws IOException {
// Test that merging without firstRowIds/dataSequenceNumbers works (no row lineage columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments should be a method comment instead of a "random" comment in the beginning of the file

}

@Test
public void testCanMergeReturnsFalseForPhysicalRowLineageWithNulls() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this test next to the other testCanMerge tests? And don't forget to change False to Null

}

@Test
public void testCanMergeReturnsFalseForSameFieldNameDifferentType() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again move this test to the other testCanMergeReturns tests

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestParquetFileMerger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The test classes don't have to be public. Make all of them package private

}

@Test
public void testCanMergeThrowsForEmptyList() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please organize the test methods a bit.
Move every testCanMerge to one place, and every testMerge after.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants