feat(optimizer): [4/N] Scheduler app#534
Conversation
…le_name) Add a composite secondary index on (database_name, table_name) to table_operations_history at the schema and entity layers. This backs a new name-based history-lookup endpoint added on optimizer-2; without the index, the query degrades to a full scan on a table that grows with every operation completion. The other three optimizer tables get no new indexes — no new query patterns on them this round. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…istory GET - Rename @RequestMapping prefix on the three optimizer controllers to share a /v1/optimizer/... namespace: /v1/table-operations -> /v1/optimizer/operations /v1/table-operations-history -> /v1/optimizer/operations-history /v1/table-stats -> /v1/optimizer/stats - Add TableByNameController hosting human/analyst-oriented name-keyed reads under /v1/optimizer/databases/{databaseName}/ tables/{tableName}. Today it carries one endpoint: GET .../operations-history (lists operation history by name). Other optimizer endpoints stay UUID-keyed because drop-and-recreate of a table produces a new optimizer identity (new stats, new storage, new operation history) and a name-only key would conflate two distinct identities. The new controller is structured for future expansion when more name-based use cases land. Backed by the composite index on table_operations_history (database_name, table_name) added on optimizer-0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The apps/optimizer shared module was created in this PR with field names and column lengths that did not match the schema established in optimizer-0: - TableStatsRow.databaseId -> databaseName - TableOperationHistoryRow.submittedAt -> completedAt - database_name / table_name VARCHAR(255) -> VARCHAR(128) Repos updated to match (TableStatsRepository param, TableOperationHistoryRepository ORDER BY column). No services/optimizer or schema SQL change needed - those already used the correct names. This change was previously folded into a later commit on optimizer-3; moving it down to the PR that owns these files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The empty @configuration class did nothing. @SpringBootApplication on AnalyzerApplication already triggers @componentscan, which discovers all @Component-annotated beans without help. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The circuit breaker was hardcoded (threshold=5, no reset, no operator visibility) and forced the AnalyzerRunner to materialize the full history of every (table, operation_type) just to check the last N rows. Cadence policy only needs the single latest history entry; pulling everything was wasted I/O. Changes: - Remove getCircuitBreakerThreshold and isCircuitBroken from OperationAnalyzer. - Add a TODO documenting requirements for the eventual replacement (configurable threshold, exponential-backoff reset, operator-visible signal). - In AnalyzerRunner, fold history loading into a per-(uuid, type) map holding only the most-recent entry; drop the per-table history list and the isCircuitBroken call. - Add a TODO to switch the history scan to a windowed query that returns at most one row per (uuid, type). - Drop the two circuit-breaker tests from AnalyzerRunnerTest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Analyzer evaluates cadence using only the most-recent history row per (table_uuid, operation_type); pulling the full history scan per analyzer pass is wasted I/O. Add a dedicated query that returns at most one row per (table_uuid, operation_type), restricted to a single operation type. The query uses a correlated MAX subquery for portability across MySQL and H2. For large history volume, a (operation_type, table_uuid, completed_at) index on the schema would make the subquery index-only; TODO noted in javadoc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch the AnalyzerRunner from scanning every history row per analyzer pass to the dedicated findLatestPerTable query (added in apps/optimizer). The analyzer only consumes the latest entry per (table_uuid, operation_type); the previous full-history scan was bounded but unnecessary I/O. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nce + TableOperation The analyzer was using raw Strings everywhere for operation type and status. Per-layer types: introduce analyzer-internal OperationType and OperationStatus enums in apps/optimizer-analyzer/model and convert at the entity boundary. The wire API (services/optimizer/api/model) and DB columns (apps/optimizer entity rows) keep their own representations and are unaffected. Changes: - New enums OperationType and OperationStatus in the analyzer model package. - TableOperation: operationType and status become enums. from(row) parses the String columns; toRow() emits .name() back. from() and pending() share a private build() factory. - TableOperation javadocs: drop "denormalized for display" wording. - OperationAnalyzer.getOperationType returns OperationType. - AnalyzerRunner: filter parameter and per-type maps are keyed on OperationType; calls to repos still pass the String .name(). - CadencePolicy.shouldSchedule: switch on OperationStatus is exhaustive (now including CANCELED), unknown values throw IllegalStateException, and the SCHEDULED branch has an inline comment explaining the two cases. - OrphanFilesDeletionAnalyzer: returns the enum. - Tests updated to construct enum values; OFD test helper takes the enum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…denceBasedOrphanFilesDeletionAnalyzer The class composes CadencePolicy and is one of potentially many strategies (volume-based, schema-aware, etc.) we could write later for the same operation type. Encode the scheduling driver in the class name so the distinction is visible at registration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The rename in the previous commit moved the files but did not change the class identifiers inside. Update both class declarations and the constructor calls in the test to match the new file name. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add idx_toph_optype_uuid_completed (operation_type, table_uuid, completed_at) on table_operations_history. TableOperationHistoryRepository.findLatestPerTable uses a correlated MAX(completed_at) subquery; without this index it degenerates to O(N²) and does not complete at 1M-row history scale. With it the inner subquery becomes an index-only lookup per outer row. Update the repo method's javadoc to point at the new index by name and drop the resolved TODO. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nalyzer Per PR #533 review (abhisheknath2011, 2026-05-26): > As analyzer is going to a service with APIs exposed, can we move the > code to the services module instead of keeping under apps module? Move: - apps/optimizer/analyzer/ → services/optimizer/analyzer/ - Gradle module :apps:optimizer:analyzer → :services:optimizer:analyzer - Package unchanged (com.linkedin.openhouse.optimizer.analyzer) Same shape will land on opt-4 (apps/optimizer/scheduler → services/ optimizer/scheduler). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…per-database Per PR #533 review (abhisheknath2011): > Is this related to max default number of tables to be processed by on > execution cycle of Analyzer run? Can we update the doc here and reflect > the same on the property name as well? Yes — it bounds the per-database working set across the three pre-load reads. Rename for clarity: - Property: optimizer.repo.default-limit → analyzer.max-tables-per-database - Field: defaultLimit → maxTablesPerDatabase - Drop the Mockito-flavored implementation note; replace with a javadoc describing semantics (per-database cap; tables beyond the bound defer to the next cycle). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Per PR #533 review (abhisheknath2011): > Going by the paging default limit of 10k, the DBs with more that 10k > tables would bot be processed. Can we ensure that all the tables within > a given DB are process by adjusting the pagination list until we reach > the end? Previous code fetched page 0 only — tables past index 10000 were silently dropped, not "deferred to the next cycle" as the stale doc claimed. Now analyzeDatabase iterates pages until the page comes back smaller than the page size (terminates on partial or empty page). - Wrap the read + per-table loop in a while-pageNumber loop. - Re-load currentOps and latestHistory per page (per-page bound on the in-memory maps; affected tables whose ops fall in a different page get treated as "no current op" → may produce a duplicate PENDING that the scheduler's cancelDuplicates handles). - Rename property: analyzer.max-tables-per-database → analyzer.tables-page-size (it caps a page, not the cycle) - Rename field: maxTablesPerDatabase → tablesPageSize - New test: analyze_iteratesAllPages_processesEveryTableAcrossPageBoundary exercises 3 tables across 2 pages with page size 2. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…se/optimizer/analyzer/AnalyzerRunner.java Co-authored-by: Abhishek Nath <anath1@linkedin.com>
…ps/optimizer/analyzerapp (#602) ## Status **(WIP)** — inspection branch on top of `mkuchenb/optimizer-3` (PR #533). Splits the analyzer into library + deployable. ## Summary Library/deployable split: - `services/optimizer/analyzer/` — library (analysis logic; `AnalyzerRunner`, `OperationAnalyzer`, `CadencePolicy`, `CadenceBasedOrphanFilesDeletionAnalyzer`) - `apps/optimizer/analyzerapp/` — deployable Spring Boot wrapper (just `AnalyzerApplication` + `application.properties`) The analysis code stays in `services/`; only the `@SpringBootApplication` entry point moves to `apps/`. ## Why `analyzerapp` and not `analyzer` as the leaf name Two Gradle leaf projects both named `analyzer` — one at `:services:optimizer:analyzer`, one at `:apps:optimizer:analyzer` — produced a self-referential `compileJava → compileJava` cycle. Disambiguating the apps leaf to `analyzerapp` avoids it. The exact Gradle mechanism wasn't pinned down (could be the leaf collision, could be the also-colliding implicit parent at `:apps:optimizer` vs explicit `:services:optimizer`), but the rename is the smallest change that makes it build. ## Build changes `services/optimizer/analyzer/build.gradle`: - `api project(':services:optimizer')` so `OperationTypeDto`, repos, etc. are visible on consumers' compile classpath. - `bootJar { enabled = false }` — no `@SpringBootApplication` here anymore. - `jar.archiveClassifier = ''` so the library publishes `analyzer.jar`. `apps/optimizer/analyzerapp/build.gradle` (new): - `openhouse.springboot-ext-conventions` + Spring Boot 2.7.8 plugins. - `implementation project(':services:optimizer:analyzer')` + Spring Boot starter + JPA + MySQL driver. `settings.gradle`: `include ':apps:optimizer:analyzerapp'`. ## Testing - `./gradlew :services:optimizer:analyzer:test` — 20 tests green. - `./gradlew :apps:optimizer:analyzerapp:bootJar` — produces `build/analyzerapp/libs/analyzerapp.jar`. ## What does NOT change - Package: every class keeps `com.linkedin.openhouse.optimizer.analyzer`. `AnalyzerApplication`'s `@SpringBootApplication` component-scan picks up the library classes because they sit in the same package, just in a different module on the classpath. - The scheduler (opt-4) is unchanged in this PR. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Brings PR #602's analyzer library/deployable split: - services/optimizer/analyzer/ (library) - apps/optimizer/analyzerapp/ (deployable) Plus the pagination removal in AnalyzerRunner (all three pre-load reads now use Pageable.unpaged() — aligned per-page pagination was incorrect). Settings.gradle resolved to include the three relevant lines: :services:optimizer:analyzer :apps:optimizer:analyzerapp :apps:optimizer:scheduler (will be split similarly in the next commit)
…r + apps/optimizer/schedulerapp; drop pagination Mirrors the analyzer's library/deployable split (PR #602) on the scheduler: - services/optimizer/scheduler/ (library; bootJar disabled, jar.classifier='') - apps/optimizer/schedulerapp/ (deployable: SchedulerApplication + application.properties) settings.gradle: - drop ':apps:optimizer:scheduler' - add ':services:optimizer:scheduler' - add ':apps:optimizer:schedulerapp' Package unchanged: com.linkedin.openhouse.optimizer.scheduler. Substantive change in SchedulerRunner.schedule(): - Drop @value("\${optimizer.repo.default-limit:10000}") + defaultLimit field. - Switch both reads to Pageable.unpaged(): - the PENDING-load operationsRepo.find(...) - the post-claim re-query for SCHEDULING rows tagged with this cycle's scheduledAt watermark - The previous PageRequest.of(0, defaultLimit) pattern was the same bug we fixed on the analyzer: a single page is fetched and anything past it is silently dropped. Correctness requires the full set per cycle; working set bounded by count(matching rows in this cycle), tracked in BDP-102738. application.properties drops optimizer.repo.default-limit (no longer used). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
## Optimizer Stack | PR | Content | |---|---| | #527 | Data Model | | #530 | Database Repos | | #531 | REST service | | #533 **(this)** | Analyzer app | | #534 | Scheduler app | | #599 | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 3 of N in the optimizer stack. Introduces `apps/optimizer-analyzer`, a Spring Boot CommandLineRunner that evaluates every table in `table_stats` against pluggable `OperationAnalyzer` strategies. The first strategy, `OrphanFilesDeletionAnalyzer`, schedules OFD operations with 24h success / 1h failure retry cadence, a 6h SCHEDULED timeout, and a 5-strike circuit breaker. Key design choices: - Bulk-loads operations and history into maps (one query per type), then iterates the stats list — O(types) queries, not O(tables). - Uses the existing generic `find()` repository methods with null params. - Pure unit tests with Mockito — no Spring context needed. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests **Core**: `AnalyzerRunner` — loads table_stats, pre-loads operations and history into maps, evaluates each table against all analyzers, circuit breaker logic. **Strategy interface**: `OperationAnalyzer` — `isEnabled(table)`, `shouldSchedule(table, currentOp, latestHistory)`, `getCircuitBreakerThreshold()`. **Cadence policy**: `CadencePolicy` — encapsulates time-based retry logic shared across operation types. **OFD analyzer**: `OrphanFilesDeletionAnalyzer` — enabled via `maintenance.optimizer.ofd.enabled` table property. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. 25 unit tests: - `AnalyzerRunnerTest` (7 tests) — eligible table insertion, cadence skip, disabled table, shouldSchedule=false, null UUID, circuit breaker trip, below-threshold pass - `OrphanFilesDeletionAnalyzerTest` (18 tests) — isEnabled variants, shouldSchedule for no-op/PENDING/SCHEDULING/SCHEDULED with history combinations ``` ./gradlew :apps:optimizer-analyzer:test # BUILD SUCCESSFUL — 25 tests pass ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Abhishek Nath <anath1@linkedin.com>
The scheduler module already depends on api project(':services:optimizer'),
so the canonical db/optimizer-schema.sql is on the test classpath. The
duplicate at scheduler/src/test/resources/schema.sql had silently diverged:
table_operations columns were widened (VARCHAR 255 vs 128) and gained an
unused `version` field; table_stats renamed `database_name`→`database_id`
and `snapshot`→`stats`; the table_stats_history and table_operations_history
tables were missing entirely. Scheduler tests had been passing only because
they touched the subset of columns/tables present in both schemas.
Point application-test.properties at classpath:db/optimizer-schema.sql and
delete the local copy so there is one source of truth.
All 17 scheduler tests pass against the canonical schema.
…use/optimizer/scheduler/SchedulerRunner.java Co-authored-by: Abhishek Nath <anath1@linkedin.com>
| Optional.empty()); | ||
| // Unpaged: the result set is already bounded by ids.size() (the bin we just claimed); no | ||
| // need to cap it further. | ||
| List<String> claimedIds = |
There was a problem hiding this comment.
As there will be multiple scheduler instances going forward, we need to ensure that one operation for a table is picked up by only one scheduler instance. Another thing is can we run multiple operations on a single table and those are picked up by different scheduler? Also distribution DBs across different scheduler instances. I guess we can think of these scenarios and adjust the code accordingly going forward. For the initial version this should be fine, just need to ensure if there are no race conditions.
There was a problem hiding this comment.
Race conditions are handled by claiming today; even if we shard I don't want overlapping runs or retries to have an issue.
- Scheduler gets N Operations
- Scheduler attempts to claim each.
- Scheduler actually schedules those it was able to claim.
(2) will fail if another scheduler races.
Sharding will enable parallelism and we would want them working on disjoints sets of operations, but I think we still need state-machine protection against duplicate job submission, in-particular the operations that create new snapshots and can cause txn conflicts
…mizer.scheduler. Applies the convention from the prior results-endpoint rename to the remaining scheduler properties for consistency. jobs.base-uri -> optimizer.scheduler.jobs.base-uri scheduler.cluster-id -> optimizer.scheduler.cluster-id scheduler.ofd.max-files-per-bin -> optimizer.scheduler.ofd.max-files-per-bin Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap SpringApplication.run in SpringApplication.exit + System.exit so the context is closed (PreDestroy hooks, JPA pool drain, etc.) and the JVM returns a deterministic exit code after the CommandLineRunner completes. Matches the standard Spring Boot batch-style entry point and is what k8s cron jobs need. Analyzer left unchanged for now; will be applied separately. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Apply the reviewer's full suggestion (#534): SchedulerApplication implements CommandLineRunner + ExitCodeGenerator directly, wraps the work in try/catch, tracks exitCode, and reports it via getExitCode(). SpringApplication.exit propagates that to System.exit so the k8s CronJob pod status reflects batch outcome. Removes the prior @bean CommandLineRunner. Also adds spring.main.banner-mode=off per the suggestion. Verified with the boot jar: - empty H2 schema (runner throws) -> caught, JPA pool drained, exit 1 - schema preloaded, no PENDING ops -> exit 0 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| * it is shared across all bins regardless of operation type. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public class Bin { |
There was a problem hiding this comment.
I am introducing BinItem as part of this PR - https://github.com/linkedin/openhouse/pull/599/changes#diff-5e026d8449953ed5f2853964c9fc6427827dac24fa3b9dba10318fb6618fb703 to represent data a granular level. I will rebase my PR once this PR is merged.
| @Getter private final OperationTypeDto operationType; | ||
| @Getter private final List<TableOperationDto> operations; |
There was a problem hiding this comment.
Shall we keep bin packing generic as common utility instead of referencing internal models and operations?
There was a problem hiding this comment.
That should give more flexibility and we should be able to integrate well with the optimizer flow as well existing scheduler flow. I am planning to leverage as a common lib as used in this PR - https://github.com/linkedin/openhouse/pull/604/changes#diff-bd8bddafa29e6a0d0dcc04642cf89b969c4890f53efa9828826e51c25f970a7d.
Optimizer Stack
Summary
PR 4 of N in the optimizer stack.
Introduces
apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.State machine:
Analyzer creates all Operations as PENDING
Changes
Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.
Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.
Jobs client: WebClient-based REST client submitting
POST /jobsto the Jobs Service with table names, operation IDs, and results endpoint.Repository additions: Three
@ModifyingCAS methods onTableOperationsRepository—cancelDuplicatePending,markScheduling,markScheduled— required for safe concurrent scheduling.Testing Done
13 unit tests:
BinPackerTest(7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sortSchedulerRunnerTest(6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claimAdditional Information