Optimizer: Batched orphan file deletion using bin packing#599
Optimizer: Batched orphan file deletion using bin packing#599abhisheknath2011 wants to merge 3 commits into
Conversation
| .connectTimeout(10, TimeUnit.SECONDS) | ||
| .readTimeout(30, TimeUnit.SECONDS) | ||
| .writeTimeout(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Ideally these would be configs.
| private static String stripTrailingSlash(String url) { | ||
| if (url == null || url.isEmpty()) { | ||
| throw new IllegalArgumentException("Optimizer Service base URL must be non-empty"); | ||
| } | ||
| return url.endsWith("/") ? url.substring(0, url.length() - 1) : url; | ||
| } |
There was a problem hiding this comment.
(1) Seems heavyweight
(2) Can we just assume not null or attempt to use optional instead if null might be present? Under what conditions is the url valid to be null?
| private final List<BatchEntry> entries; | ||
| private final String resultsEndpoint; | ||
| private final int driverParallelism; | ||
| private final long ttlSeconds; | ||
| private final String backupDir; | ||
| private final int concurrentDeletes; | ||
| private final boolean streamResults; | ||
| private final int maxOrphanFileSampleSize; |
There was a problem hiding this comment.
Thoughts on a OFD parameters object that we use a builder to construct? The main thought was to encapsulate parameters rather than having them be manually maintained. I'm generally a fan of generating with annotations to avoid this boilerplate as lines 62-92 are all just defining parameters and a public funciton to supply them.
| int concurrentDeletes, | ||
| boolean streamResults, | ||
| int maxOrphanFileSampleSize) { | ||
| super(jobId, stateManager, otelEmitter); |
There was a problem hiding this comment.
There is a callback to complete job on HTS right? Do we need to adapt that or is it fine to leave as-is?
There was a problem hiding this comment.
Inherited from BaseSparkApp.run(): stateManager.updateState(jobId, SUCCEEDED/FAILED) already fires via onStarted/onFinished plus heartbeats — HTS sees this job's lifecycle. The new optimizer-side callbacks are per-operation (per-table); HTS callback is per-job. Both are intentional and orthogonal.
| try { | ||
| client.updateOperation(body); | ||
| } catch (IOException e) { | ||
| log.error( |
There was a problem hiding this comment.
counter? We can get signal on how many jobs are failing to emit metrics.
There was a problem hiding this comment.
Yes emitting counter metrics and logging error.
| private int countOrphans(DeleteOrphanFiles.Result result) { | ||
| int count = 0; | ||
| for (String unused : result.orphanFileLocations()) { | ||
| count++; | ||
| } | ||
| return count; | ||
| } | ||
| } |
There was a problem hiding this comment.
Can we do result.count()?
There was a problem hiding this comment.
Use iterables count to reduce driver memory usage as we don't need full path materialization.
| if (tableNames == null | ||
| || operationIds == null | ||
| || tableUuids == null | ||
| || tableNames.isEmpty() |
There was a problem hiding this comment.
Is there a practical limit to the number of tables in the job based on the input string length limits?
## Optimizer Stack | PR | Content | |---|---| | #527 | Data Model | | #530 | Database Repos | | #531 | REST service | | #533 **(this)** | Analyzer app | | #534 | Scheduler app | | #599 | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 3 of N in the optimizer stack. Introduces `apps/optimizer-analyzer`, a Spring Boot CommandLineRunner that evaluates every table in `table_stats` against pluggable `OperationAnalyzer` strategies. The first strategy, `OrphanFilesDeletionAnalyzer`, schedules OFD operations with 24h success / 1h failure retry cadence, a 6h SCHEDULED timeout, and a 5-strike circuit breaker. Key design choices: - Bulk-loads operations and history into maps (one query per type), then iterates the stats list — O(types) queries, not O(tables). - Uses the existing generic `find()` repository methods with null params. - Pure unit tests with Mockito — no Spring context needed. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests **Core**: `AnalyzerRunner` — loads table_stats, pre-loads operations and history into maps, evaluates each table against all analyzers, circuit breaker logic. **Strategy interface**: `OperationAnalyzer` — `isEnabled(table)`, `shouldSchedule(table, currentOp, latestHistory)`, `getCircuitBreakerThreshold()`. **Cadence policy**: `CadencePolicy` — encapsulates time-based retry logic shared across operation types. **OFD analyzer**: `OrphanFilesDeletionAnalyzer` — enabled via `maintenance.optimizer.ofd.enabled` table property. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. 25 unit tests: - `AnalyzerRunnerTest` (7 tests) — eligible table insertion, cadence skip, disabled table, shouldSchedule=false, null UUID, circuit breaker trip, below-threshold pass - `OrphanFilesDeletionAnalyzerTest` (18 tests) — isEnabled variants, shouldSchedule for no-op/PENDING/SCHEDULING/SCHEDULED with history combinations ``` ./gradlew :apps:optimizer-analyzer:test # BUILD SUCCESSFUL — 25 tests pass ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Abhishek Nath <anath1@linkedin.com>
Summary
Introduces
BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-tableOrphanFilesDeletionSparkApp. One Spark job now processes a list of(table, operationId)pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.Also lands a first-fit-decreasing bin packer (
jobs.util.binpack) that the scheduler (#534) will use to assemble those batches. The packer has no caller in this PR — it ships alongside the Spark app so the algorithm can be reviewed independently from the scheduler wiring. Keeping it inapps/sparkfor now since the scheduler module isn't merged yet; it can move along side the scheduler once PR #534 is merged.Key design choices:
POST /v1/optimizer/operations/updateitself fails, the row staysSCHEDULEDand the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.--driverParallelismis honoured verbatim; the app does not pick its own thread count.OperationUpdateRequestis mirrored inapps/sparkso this PR compiles independently of the optimizer-service module's merge order. Replace with the shared DTO once the optimizer module lives inapps/.Bin packer (
com.linkedin.openhouse.jobs.util.binpack):-
BinItem— fqtn, operationId, tableUuid, db/table, weight (numFiles), sizeBytesBin— mutable accumulator with three-dimensional fit checkFirstFitDecreasingBinPacker— FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated binOptimizer Service client (
com.linkedin.openhouse.jobs.spark.optimizer):OperationUpdateRequest— wire-compatible body forPOST /v1/optimizer/operations/updateOptimizerServiceClient— thin OkHttp client with sensible timeoutsBatched Spark app (
com.linkedin.openhouse.jobs.spark):BatchedOrphanFilesDeletionSparkApp— extendsBaseSparkApp; iterates entries via a fixed thread pool, reusesOperations.deleteOrphanFiles(...)per table, posts per-operation status, runs the existingTableStateValidatorper tableCLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (
--ttl,--backupDir,--concurrentDeletes,--streamResults,--maxOrphanFileSampleSize).Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
Breaking Changes
Deprecations
Large PR broken into smaller PRs, and PR plan linked in the description.
Open items for reviewers:
OperationUpdateRequestis a local mirror of feat(optimizer): [2/N] Optimizer REST Service and Controller #531'sUpdateOperationRequest. Should we introduce anapps/optimizershared module here (mirroring the analyzer's pattern in feat(optimizer): [3/N] Analyzer #533) and depend on it instead?apps/spark/util/binpackor move to the scheduler module in feat(optimizer): [4/N] Scheduler app #534 alongside its only caller?POST /v1/optimizer/operations/updateper feat(optimizer): [2/N] Optimizer REST Service and Controller #531. If that endpoint changes name (e.g. toPOST /{id}/complete),OptimizerServiceClient.UPDATE_PATHis the only place that needs to change.For all the boxes checked, include additional details of the changes made in this pull request.