@@ -992,7 +992,8 @@ where
992992 G : Scope < Timestamp = Timestamp > ,
993993{
994994 let scope = input. scope ( ) ;
995- let name_for_logging = name. clone ( ) ;
995+ let worker_id = scope. index ( ) ;
996+ let name_for_logging = format ! ( "{name}-worker-{worker_id}" ) ;
996997 let mut builder = OperatorBuilder :: new ( name, scope. clone ( ) ) ;
997998
998999 let ( output, output_stream) = builder. new_output :: < CapacityContainerBuilder < _ > > ( ) ;
@@ -1354,16 +1355,19 @@ where
13541355 ) ;
13551356 let mut max_upper = Antichain :: from_elem ( Timestamp :: minimum ( ) ) ;
13561357 for ( desc, mut delta_writer) in ready_batches {
1358+ let instant = Instant :: now ( ) ;
13571359 let data_files = delta_writer
13581360 . close ( )
13591361 . await
13601362 . context ( "Failed to close DeltaWriter" ) ?;
1363+ let duration = instant. elapsed ( ) ;
13611364 debug ! (
1362- "{}: closed batch [{}, {}), wrote {} files" ,
1365+ "{}: closed batch [{}, {}), wrote {} files in {:?} " ,
13631366 name_for_logging,
13641367 desc. 0 . pretty( ) ,
13651368 desc. 1 . pretty( ) ,
1366- data_files. len( )
1369+ data_files. len( ) ,
1370+ duration
13671371 ) ;
13681372 for data_file in data_files {
13691373 match data_file. content_type ( ) {
@@ -1534,43 +1538,90 @@ where
15341538 }
15351539 } ) ;
15361540
1537- for batch in done_batches {
1538- let file_set = batch_descriptions. remove ( & batch) . unwrap ( ) ;
1539-
1541+ let mut done_iter = done_batches. into_iter ( ) . peekable ( ) ;
1542+ while let Some ( batch) = done_iter. next ( ) {
15401543 let mut data_files = vec ! [ ] ;
15411544 let mut delete_files = vec ! [ ] ;
15421545 // Track totals for committed statistics
15431546 let mut total_messages: u64 = 0 ;
15441547 let mut total_bytes: u64 = 0 ;
1545- for file in file_set. data_files {
1546- total_messages += file. data_file ( ) . record_count ( ) ;
1547- total_bytes += file. data_file ( ) . file_size_in_bytes ( ) ;
1548- match file. data_file ( ) . content_type ( ) {
1549- iceberg:: spec:: DataContentType :: Data => {
1550- data_files. push ( file. into_data_file ( ) ) ;
1548+ let mut empty_batch_count: Option < usize > = None ;
1549+ let is_empty = batch_descriptions
1550+ . get ( & batch)
1551+ . map ( |set| set. data_files . is_empty ( ) )
1552+ . unwrap_or ( false ) ;
1553+ let ( commit_lower, commit_upper) = if is_empty {
1554+ // Optimization: coalesce contiguous empty batches into a single commit
1555+ // that advances the frontier to the last empty upper. This avoids a long
1556+ // sequence of zero-file Iceberg commits during snapshots.
1557+ let mut upper = batch. 1 . clone ( ) ;
1558+ let mut count = 1usize ;
1559+ batch_descriptions
1560+ . remove ( & batch)
1561+ . expect ( "batch exists in descriptions" ) ;
1562+ while let Some ( next_batch) = done_iter. peek ( ) {
1563+ let next_empty = batch_descriptions
1564+ . get ( next_batch)
1565+ . map ( |set| set. data_files . is_empty ( ) )
1566+ . unwrap_or ( false ) ;
1567+ if next_batch. 0 != upper || !next_empty {
1568+ break ;
15511569 }
1552- iceberg:: spec:: DataContentType :: PositionDeletes |
1553- iceberg:: spec:: DataContentType :: EqualityDeletes => {
1554- delete_files. push ( file. into_data_file ( ) ) ;
1570+ let next_batch = done_iter. next ( ) . expect ( "peeked batch" ) ;
1571+ batch_descriptions
1572+ . remove ( & next_batch)
1573+ . expect ( "batch exists in descriptions" ) ;
1574+ upper = next_batch. 1 . clone ( ) ;
1575+ count += 1 ;
1576+ }
1577+ empty_batch_count = Some ( count) ;
1578+ ( batch. 0 . clone ( ) , upper)
1579+ } else {
1580+ let file_set = batch_descriptions
1581+ . remove ( & batch)
1582+ . expect ( "batch exists in descriptions" ) ;
1583+ for file in file_set. data_files {
1584+ total_messages += file. data_file ( ) . record_count ( ) ;
1585+ total_bytes += file. data_file ( ) . file_size_in_bytes ( ) ;
1586+ match file. data_file ( ) . content_type ( ) {
1587+ iceberg:: spec:: DataContentType :: Data => {
1588+ data_files. push ( file. into_data_file ( ) ) ;
1589+ }
1590+ iceberg:: spec:: DataContentType :: PositionDeletes |
1591+ iceberg:: spec:: DataContentType :: EqualityDeletes => {
1592+ delete_files. push ( file. into_data_file ( ) ) ;
1593+ }
15551594 }
15561595 }
1557- }
1596+ ( batch. 0 . clone ( ) , batch. 1 . clone ( ) )
1597+ } ;
15581598
1559- debug ! (
1560- ?sink_id,
1561- %name_for_logging,
1562- lower = %batch. 0 . pretty( ) ,
1563- upper = %batch. 1 . pretty( ) ,
1564- data_files = data_files. len( ) ,
1565- delete_files = delete_files. len( ) ,
1566- total_messages,
1567- total_bytes,
1568- "iceberg commit applying batch"
1569- ) ;
1599+ if let Some ( empty_batches) = empty_batch_count {
1600+ debug ! (
1601+ ?sink_id,
1602+ %name_for_logging,
1603+ lower = %commit_lower. pretty( ) ,
1604+ upper = %commit_upper. pretty( ) ,
1605+ empty_batches,
1606+ "iceberg commit applying empty batch run"
1607+ ) ;
1608+ } else {
1609+ debug ! (
1610+ ?sink_id,
1611+ %name_for_logging,
1612+ lower = %commit_lower. pretty( ) ,
1613+ upper = %commit_upper. pretty( ) ,
1614+ data_files = data_files. len( ) ,
1615+ delete_files = delete_files. len( ) ,
1616+ total_messages,
1617+ total_bytes,
1618+ "iceberg commit applying batch"
1619+ ) ;
1620+ }
15701621
15711622 let instant = Instant :: now ( ) ;
15721623
1573- let frontier = batch . 1 . clone ( ) ;
1624+ let frontier = commit_upper . clone ( ) ;
15741625 let tx = Transaction :: new ( & table) ;
15751626
15761627 let frontier_json = serde_json:: to_string ( & frontier. elements ( ) )
@@ -1640,16 +1691,30 @@ where
16401691
16411692 let duration = instant. elapsed ( ) ;
16421693
1643- debug ! (
1644- ?sink_id,
1645- %name_for_logging,
1646- lower = %batch. 0 . pretty( ) ,
1647- upper = %batch. 1 . pretty( ) ,
1648- total_messages,
1649- total_bytes,
1650- ?duration,
1651- "iceberg commit applied batch"
1652- ) ;
1694+ if let Some ( empty_batches) = empty_batch_count {
1695+ debug ! (
1696+ ?sink_id,
1697+ %name_for_logging,
1698+ lower = %commit_lower. pretty( ) ,
1699+ upper = %commit_upper. pretty( ) ,
1700+ empty_batches,
1701+ total_messages,
1702+ total_bytes,
1703+ ?duration,
1704+ "iceberg commit applied empty batch run"
1705+ ) ;
1706+ } else {
1707+ debug ! (
1708+ ?sink_id,
1709+ %name_for_logging,
1710+ lower = %commit_lower. pretty( ) ,
1711+ upper = %commit_upper. pretty( ) ,
1712+ total_messages,
1713+ total_bytes,
1714+ ?duration,
1715+ "iceberg commit applied batch"
1716+ ) ;
1717+ }
16531718
16541719 metrics. snapshots_committed . inc ( ) ;
16551720 statistics. inc_messages_committed_by ( total_messages) ;
0 commit comments