Skip to content

Optimizer: Batched orphan file deletion using bin packing#599

Open
abhisheknath2011 wants to merge 3 commits into
linkedin:mainfrom
abhisheknath2011:batched-ofd
Open

Optimizer: Batched orphan file deletion using bin packing#599
abhisheknath2011 wants to merge 3 commits into
linkedin:mainfrom
abhisheknath2011:batched-ofd

Conversation

@abhisheknath2011
Copy link
Copy Markdown
Member

@abhisheknath2011 abhisheknath2011 commented May 22, 2026

Summary

Introduces BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-table OrphanFilesDeletionSparkApp. One Spark job now processes a list of (table, operationId) pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.

Also lands a first-fit-decreasing bin packer (jobs.util.binpack) that the scheduler (#534) will use to assemble those batches. The packer has no caller in this PR — it ships alongside the Spark app so the algorithm can be reviewed independently from the scheduler wiring. Keeping it in apps/spark for now since the scheduler module isn't merged yet; it can move along side the scheduler once PR #534 is merged.

Key design choices:

  • Per-table failure isolation — exceptions in one table are caught, FAILED is posted for that operationId, and remaining tables continue. The job exits 0 if at least one table succeeds.
  • Recoverable result reporting — if POST /v1/optimizer/operations/update itself fails, the row stays SCHEDULED and the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.
  • Scheduler decides parallelism, not the app--driverParallelism is honoured verbatim; the app does not pick its own thread count.
  • Bin packer never drops oversized tables — an item exceeding any single cap is placed in a dedicated bin rather than silently skipped.
  • Self-contained wire DTOOperationUpdateRequest is mirrored in apps/spark so this PR compiles independently of the optimizer-service module's merge order. Replace with the shared DTO once the optimizer module lives in apps/.
    Bin packer (com.linkedin.openhouse.jobs.util.binpack):
    -BinItem — fqtn, operationId, tableUuid, db/table, weight (numFiles), sizeBytes
  • Bin — mutable accumulator with three-dimensional fit check
  • FirstFitDecreasingBinPacker — FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin

Optimizer Service client (com.linkedin.openhouse.jobs.spark.optimizer):

  • OperationUpdateRequest — wire-compatible body for POST /v1/optimizer/operations/update
  • OptimizerServiceClient — thin OkHttp client with sensible timeouts

Batched Spark app (com.linkedin.openhouse.jobs.spark):

  • BatchedOrphanFilesDeletionSparkApp — extends BaseSparkApp; iterates entries via a fixed thread pool, reuses Operations.deleteOrphanFiles(...) per table, posts per-operation status, runs the existing TableStateValidator per table

CLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (--ttl, --backupDir, --concurrentDeletes, --streamResults, --maxOrphanFileSampleSize).

Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

For all the boxes checked, include additional details of the changes made in this pull request.

Comment on lines +69 to +71
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these would be configs.

Comment on lines +75 to +80
private static String stripTrailingSlash(String url) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("Optimizer Service base URL must be non-empty");
}
return url.endsWith("/") ? url.substring(0, url.length() - 1) : url;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) Seems heavyweight
(2) Can we just assume not null or attempt to use optional instead if null might be present? Under what conditions is the url valid to be null?

Comment on lines +62 to +69
private final List<BatchEntry> entries;
private final String resultsEndpoint;
private final int driverParallelism;
private final long ttlSeconds;
private final String backupDir;
private final int concurrentDeletes;
private final boolean streamResults;
private final int maxOrphanFileSampleSize;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on a OFD parameters object that we use a builder to construct? The main thought was to encapsulate parameters rather than having them be manually maintained. I'm generally a fan of generating with annotations to avoid this boilerplate as lines 62-92 are all just defining parameters and a public funciton to supply them.

int concurrentDeletes,
boolean streamResults,
int maxOrphanFileSampleSize) {
super(jobId, stateManager, otelEmitter);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a callback to complete job on HTS right? Do we need to adapt that or is it fine to leave as-is?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inherited from BaseSparkApp.run(): stateManager.updateState(jobId, SUCCEEDED/FAILED) already fires via onStarted/onFinished plus heartbeats — HTS sees this job's lifecycle. The new optimizer-side callbacks are per-operation (per-table); HTS callback is per-job. Both are intentional and orthogonal.

try {
client.updateOperation(body);
} catch (IOException e) {
log.error(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter? We can get signal on how many jobs are failing to emit metrics.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes emitting counter metrics and logging error.

Comment on lines +278 to +285
private int countOrphans(DeleteOrphanFiles.Result result) {
int count = 0;
for (String unused : result.orphanFileLocations()) {
count++;
}
return count;
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do result.count()?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use iterables count to reduce driver memory usage as we don't need full path materialization.

if (tableNames == null
|| operationIds == null
|| tableUuids == null
|| tableNames.isEmpty()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a practical limit to the number of tables in the job based on the input string length limits?

@mkuchenbecker mkuchenbecker mentioned this pull request May 22, 2026
17 tasks
mkuchenbecker added a commit that referenced this pull request May 27, 2026
## Optimizer Stack

| PR | Content |
|---|---|
| #527 | Data Model |
| #530 | Database Repos |
| #531 | REST service |
| #533 **(this)** | Analyzer
app |
| #534 | Scheduler app |
| #599 | Spark BatchedOFD app
|
| #tbd | Infra, docker-compose, smoke test |

## Summary

PR 3 of N in the optimizer stack.

Introduces `apps/optimizer-analyzer`, a Spring Boot CommandLineRunner
that evaluates every table in `table_stats` against pluggable
`OperationAnalyzer` strategies. The first strategy,
`OrphanFilesDeletionAnalyzer`, schedules OFD operations with 24h success
/ 1h failure retry cadence, a 6h SCHEDULED timeout, and a 5-strike
circuit breaker.

Key design choices:
- Bulk-loads operations and history into maps (one query per type), then
iterates the stats list — O(types) queries, not O(tables).
- Uses the existing generic `find()` repository methods with null
params.
- Pure unit tests with Mockito — no Spring context needed.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**Core**: `AnalyzerRunner` — loads table_stats, pre-loads operations and
history into maps, evaluates each table against all analyzers, circuit
breaker logic.

**Strategy interface**: `OperationAnalyzer` — `isEnabled(table)`,
`shouldSchedule(table, currentOp, latestHistory)`,
`getCircuitBreakerThreshold()`.

**Cadence policy**: `CadencePolicy` — encapsulates time-based retry
logic shared across operation types.

**OFD analyzer**: `OrphanFilesDeletionAnalyzer` — enabled via
`maintenance.optimizer.ofd.enabled` table property.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

25 unit tests:
- `AnalyzerRunnerTest` (7 tests) — eligible table insertion, cadence
skip, disabled table, shouldSchedule=false, null UUID, circuit breaker
trip, below-threshold pass
- `OrphanFilesDeletionAnalyzerTest` (18 tests) — isEnabled variants,
shouldSchedule for no-op/PENDING/SCHEDULING/SCHEDULED with history
combinations

```
./gradlew :apps:optimizer-analyzer:test
# BUILD SUCCESSFUL — 25 tests pass
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

---------

Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Abhishek Nath <anath1@linkedin.com>
@abhisheknath2011 abhisheknath2011 marked this pull request as ready for review May 27, 2026 19:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants