Skip to content

Collapse repeated collectAsList in DataLayoutStrategyGenerator#578

Draft
kamanavishnu wants to merge 1 commit into
linkedin:mainfrom
kamanavishnu:optimize-datalayout-strategy-driver-aggregate
Draft

Collapse repeated collectAsList in DataLayoutStrategyGenerator#578
kamanavishnu wants to merge 1 commit into
linkedin:mainfrom
kamanavishnu:optimize-datalayout-strategy-driver-aggregate

Conversation

@kamanavishnu
Copy link
Copy Markdown
Collaborator

@kamanavishnu kamanavishnu commented May 11, 2026

Summary

buildDataLayoutStrategy in OpenHouseDataLayoutStrategyGenerator issues 4 Spark actions to characterize the file mix (one count + three collectAsList via computeFileStats), and computeEntropy issues 3 more (count + toLocalIterator + count). Each action has Spark scheduling overhead even when the underlying data is cached.

This PR collects the Dataset<FileStat> once per buildDataLayoutStrategy call and aggregates in driver memory, replacing 7 actions with 1.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Details

In OpenHouseDataLayoutStrategyGenerator.java:

  • New aggregateInMemory(List<FileStat>, Predicate<FileStat>) helper sums (totalBytes, count, totalRecords) over rows matching a predicate. Pure driver-side, no Spark action.
  • buildDataLayoutStrategy now does a single fileStats.collectAsList() and feeds it to three aggregateInMemory calls for DATA-small, POSITION_DELETES, EQUALITY_DELETES tuples.
  • computeEntropy(Dataset<Long>) -> computeEntropy(List<Long>): single in-memory pass, eliminates the prior 3 actions.
  • dataFileSizes (used by computeEntropy) is derived from the same collected list instead of issuing dataFiles.map(...) separately.
  • Drop now-unused imports (Iterator, Stream, MapFunction, Encoders); add Predicate, Collectors.

Action-count impact (per buildDataLayoutStrategy)

Before After
count actions 1 0
collectAsList actions 3 (in computeFileStats) 1
computeEntropy actions 3 (count + toLocalIterator + count) 0
Total 7 1

Driver memory

The aggregation runs entirely in driver memory after collectAsList. The peak footprint equals one List<FileStat> - which is the same as today, since the old code already invoked collectAsList three times on the same Dataset. No new OOM mode is introduced. Tables that already fit comfortably in the driver continue to; tables that didn't fail at the same point as before.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

The change is intended to be functionally equivalent. Public API and every observable output (every DataLayoutStrategy field: gain, cost, score, entropy, delete-file bytes/counts/records, fileCountReductionPenalty, config) are unchanged and exercised by the existing suite:

  • ./gradlew :libs:datalayout:test - all 17 tests pass, including:
    • OpenHouseDataLayoutStrategyGeneratorTest.testTableLevelStrategy
    • OpenHouseDataLayoutStrategyGeneratorTest.testTableLevelStrategyPartitioned
    • OpenHouseDataLayoutStrategyGeneratorTest.testPartitionLevelStrategy
    • IntegrationTest.testCompactionStrategyGenerationNonPartitioned
    • IntegrationTest.testCompactionStrategyGenerationWithPersistencePartitioned

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

buildDataLayoutStrategy previously issued 4 separate Spark actions to
characterize the file mix:
  - filteredDataFiles.count() to detect "no work"
  - computeFileStats(DATA) -> collectAsList
  - computeFileStats(POSITION_DELETES) -> collectAsList
  - computeFileStats(EQUALITY_DELETES) -> collectAsList
and computeEntropy issued 3 more (count, toLocalIterator, count) on a
freshly mapped Dataset<Long>.

This change collects fileStats once and aggregates in the driver:

  - New aggregateInMemory(List<FileStat>, Predicate<FileStat>) helper
    sums (totalBytes, count, totalRecords) over rows matching a
    predicate; pure driver-side, issues no Spark action.
  - buildDataLayoutStrategy now does a single fileStats.collectAsList()
    and feeds it to aggregateInMemory for DATA-small, POSITION_DELETES,
    and EQUALITY_DELETES tuples.
  - computeEntropy now takes List<Long> and computes the mean-squared
    error in one in-memory pass, eliminating the prior 3 actions.
  - dataFileSizes (used by computeEntropy) is derived from the same
    collected list instead of issuing dataFiles.map(...) separately.
  - Drops the now-unused Iterator, Stream, MapFunction, Encoders
    imports; adds Predicate, Collectors.

Driver memory: peak is bounded by the size of the FileStat list, which
is the same as today (the old code already collected the same Dataset
three times). The new code collects once. No new OOM mode.

Functionally equivalent; existing unit and integration tests pass.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant