Collapse repeated collectAsList in DataLayoutStrategyGenerator#578
Draft
kamanavishnu wants to merge 1 commit into
Draft
Collapse repeated collectAsList in DataLayoutStrategyGenerator#578kamanavishnu wants to merge 1 commit into
kamanavishnu wants to merge 1 commit into
Conversation
buildDataLayoutStrategy previously issued 4 separate Spark actions to
characterize the file mix:
- filteredDataFiles.count() to detect "no work"
- computeFileStats(DATA) -> collectAsList
- computeFileStats(POSITION_DELETES) -> collectAsList
- computeFileStats(EQUALITY_DELETES) -> collectAsList
and computeEntropy issued 3 more (count, toLocalIterator, count) on a
freshly mapped Dataset<Long>.
This change collects fileStats once and aggregates in the driver:
- New aggregateInMemory(List<FileStat>, Predicate<FileStat>) helper
sums (totalBytes, count, totalRecords) over rows matching a
predicate; pure driver-side, issues no Spark action.
- buildDataLayoutStrategy now does a single fileStats.collectAsList()
and feeds it to aggregateInMemory for DATA-small, POSITION_DELETES,
and EQUALITY_DELETES tuples.
- computeEntropy now takes List<Long> and computes the mean-squared
error in one in-memory pass, eliminating the prior 3 actions.
- dataFileSizes (used by computeEntropy) is derived from the same
collected list instead of issuing dataFiles.map(...) separately.
- Drops the now-unused Iterator, Stream, MapFunction, Encoders
imports; adds Predicate, Collectors.
Driver memory: peak is bounded by the size of the FileStat list, which
is the same as today (the old code already collected the same Dataset
three times). The new code collects once. No new OOM mode.
Functionally equivalent; existing unit and integration tests pass.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
buildDataLayoutStrategyinOpenHouseDataLayoutStrategyGeneratorissues 4 Spark actions to characterize the file mix (onecount+ threecollectAsListviacomputeFileStats), andcomputeEntropyissues 3 more (count+toLocalIterator+count). Each action has Spark scheduling overhead even when the underlying data is cached.This PR collects the
Dataset<FileStat>once perbuildDataLayoutStrategycall and aggregates in driver memory, replacing 7 actions with 1.Changes
Details
In
OpenHouseDataLayoutStrategyGenerator.java:aggregateInMemory(List<FileStat>, Predicate<FileStat>)helper sums(totalBytes, count, totalRecords)over rows matching a predicate. Pure driver-side, no Spark action.buildDataLayoutStrategynow does a singlefileStats.collectAsList()and feeds it to threeaggregateInMemorycalls for DATA-small, POSITION_DELETES, EQUALITY_DELETES tuples.computeEntropy(Dataset<Long>)->computeEntropy(List<Long>): single in-memory pass, eliminates the prior 3 actions.dataFileSizes(used bycomputeEntropy) is derived from the same collected list instead of issuingdataFiles.map(...)separately.Iterator,Stream,MapFunction,Encoders); addPredicate,Collectors.Action-count impact (per
buildDataLayoutStrategy)countactionscollectAsListactionscomputeFileStats)computeEntropyactionscount+toLocalIterator+count)Driver memory
The aggregation runs entirely in driver memory after
collectAsList. The peak footprint equals oneList<FileStat>- which is the same as today, since the old code already invokedcollectAsListthree times on the same Dataset. No new OOM mode is introduced. Tables that already fit comfortably in the driver continue to; tables that didn't fail at the same point as before.Testing Done
The change is intended to be functionally equivalent. Public API and every observable output (every
DataLayoutStrategyfield:gain,cost,score,entropy, delete-file bytes/counts/records,fileCountReductionPenalty,config) are unchanged and exercised by the existing suite:./gradlew :libs:datalayout:test- all 17 tests pass, including:OpenHouseDataLayoutStrategyGeneratorTest.testTableLevelStrategyOpenHouseDataLayoutStrategyGeneratorTest.testTableLevelStrategyPartitionedOpenHouseDataLayoutStrategyGeneratorTest.testPartitionLevelStrategyIntegrationTest.testCompactionStrategyGenerationNonPartitionedIntegrationTest.testCompactionStrategyGenerationWithPersistencePartitionedAdditional Information