Skip to content

internalcatalog: bump HDFS replication on metadata.json after write#584

Draft
mkuchenbecker wants to merge 2 commits into
linkedin:mainfrom
mkuchenbecker:mkuchenb/metadata-repl-bump
Draft

internalcatalog: bump HDFS replication on metadata.json after write#584
mkuchenbecker wants to merge 2 commits into
linkedin:mainfrom
mkuchenbecker:mkuchenb/metadata-repl-bump

Conversation

@mkuchenbecker
Copy link
Copy Markdown
Collaborator

Summary

Bump the HDFS replication factor on each newly-written metadata.json file to 30 immediately after TableMetadataParser.write(...) returns inside OpenHouseInternalTableOperations.doCommit(...). Paired with an internal li-openhouse infra PR that drops dfs.replication on Holdem from 30 -> 10, this moves the durability cost off the synchronous commit-write-ack path while preserving repl=30 durability on the persisted file.

Ordering constraint (important): this PR must merge before or together with the paired li-openhouse PR. If li-openhouse lands first, every newly-written metadata.json on Holdem will live at repl=10 indefinitely until this PR ships — a durability regression. Reviewers, please coordinate on merge order.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Internal API change / new behavior in doCommit: Added a private bumpMetadataReplicationIfHdfs(...) helper invoked immediately after the existing synchronous metadata write. Behavior:

  • HDFS-only via storage.getClient() instanceof HdfsStorageClient. Non-HDFS clients (local, blob) silently skip — their durability is managed elsewhere.
  • Pulls FileSystem from fileIOManager.getStorage(fileIO).getClient().getNativeClient(), mirroring the existing updateMetadataFieldForTable(...) helper at the bottom of the same file.
  • fs.setReplication(new Path(newMetadataLocation), CatalogConstants.METADATA_FILE_HDFS_REPLICATION).
  • Failures are caught, logged at WARN with the path and exception, and swallowed. They do NOT fail the commit — the file is already durably written at the cluster default replication, so a failed bump is a degraded-durability event, not a commit failure.
  • Wrapped in metricsReporter.executeWithStats(...) under a new metadata_replication_set_latency metric, matching the existing class-level Micrometer convention. (No OpenTelemetry tracing was introduced — none exists in this class today.)

Performance improvement (rationale): After the paired li-openhouse change drops dfs.replication to 10, synchronous TableMetadataParser.write(...) will return once the file is durable at 10 datanodes instead of 30. The async bump in this PR restores on-disk durability to repl=30 without putting that pipeline-fill cost on the commit-write-ack path. The expected effect is a meaningful drop in commit p50/p99 metadata-write latency on Holdem; the new metric will quantify both the savings and the bump's own cost in production.

Configuration choice: METADATA_FILE_HDFS_REPLICATION = (short) 30 is added as a constant on CatalogConstants. There is no established Spring-config-into-OpenHouseInternalTableOperations convention today, and the constructor is @AllArgsConstructor-generated, so adding a config-injected field would ripple through every construction site. The constant carries an explicit TODO to promote to a Spring config property (e.g. openhouse.metadata.hdfs.replication) in a follow-up PR.

Interpretation note for reviewers: the internal trigger described "the manifest json file." In Iceberg's commit layout the only JSON file written per commit is the table metadata file <version>-<uuid>.metadata.json. Manifest files proper (*-m#.avro) and the manifest list (snap-*.avro) are Avro. This PR treats metadata.json only. If the same durability-bump treatment is wanted for Avro manifest/manifest-list files, that requires a FileIO wrapper and is a separate follow-up.

Files changed (4):

  • iceberg/openhouse/internalcatalog/.../OpenHouseInternalTableOperations.java — new helper + call site in doCommit, org.apache.hadoop.fs.Path import added.
  • iceberg/openhouse/internalcatalog/.../CatalogConstants.java — new METADATA_FILE_HDFS_REPLICATION short constant + TODO.
  • iceberg/openhouse/internalcatalog/.../InternalCatalogMetricsConstant.java — new METADATA_REPLICATION_SET_LATENCY metric name constant.
  • iceberg/openhouse/internalcatalog/src/test/.../OpenHouseInternalTableOperationsTest.java — 3 new tests, 1 existing-test verify-count update.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

Added tests (all on OpenHouseInternalTableOperationsTest, mirroring the file's existing Mockito-based style):

  1. testDoCommitBumpsMetadataReplicationOnHdfs — sets up the storage mock to return an HdfsStorageClient whose native client is a mock FileSystem, drives a commit, captures the args to setReplication(Path, short) and asserts the replication factor equals CatalogConstants.METADATA_FILE_HDFS_REPLICATION (30) and the path lives under the table location.
  2. testDoCommitDoesNotBumpReplicationOnLocalStorage — drives a commit with a LocalStorageClient and verifies setReplication was never invoked (the instanceof HdfsStorageClient guard short-circuits).
  3. testDoCommitSucceedsWhenReplicationBumpFails — stubs setReplication to throw IOException, drives a commit, asserts the commit completes successfully (houseTableRepository.save(...) is invoked once) and the bump was attempted. Validates the warn-and-swallow durability/availability trade-off.

Updated existing test: testDoCommitUpdateMetadataForInitalVersionCommitfileIOManager.getStorage(...) is now called twice on the replicated-initial-version path (once by the new bump helper, once by the pre-existing updateMetadataFieldForTable). Updated the verify counts and added inline comments explaining each call's purpose.

Module test run: ./gradlew :iceberg:openhouse:internalcatalog:test — all 69 tests green (including the 3 new ones).

Spotless: ./gradlew :iceberg:openhouse:internalcatalog:spotlessCheck — clean (after one spotlessApply pass during authoring).

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

PR plan:

  1. This PR (openhouse repo) — adds the synchronous setReplication(..., 30) bump after metadata.json is written. Must merge first or together with (2).
  2. Paired li-openhouse PR (separate repo, in flight) — lowers dfs.replication from 30 -> 10 in holdem-hdfs-site-extra.xml. Must NOT merge before (1) or new metadata.json files on Holdem will live at repl=10 until this PR ships.
  3. Follow-up (separate PR, not blocking) — promote METADATA_FILE_HDFS_REPLICATION from a constant on CatalogConstants to a Spring config property (openhouse.metadata.hdfs.replication) so deployments can tune the durable replication factor without a code change.
  4. Follow-up (separate PR, conditional on reviewer intent) — if the same durability-bump treatment is desired for Avro manifest / manifest-list files (*-m#.avro, snap-*.avro), that needs a FileIO wrapper; out of scope here.

After the synchronous TableMetadataParser.write(...) inside doCommit(...),
call fs.setReplication(<newMetadataLocation>, 30) when the storage client
is HdfsStorageClient. Non-HDFS clients are skipped. setReplication failures
are a degraded-durability event, not a commit failure -- the file is already
durably written at the cluster default replication, so failures are logged
at WARN and swallowed. The call is wrapped in metricsReporter.executeWithStats
under a new metadata_replication_set_latency metric for observability.

The target replication factor lives on CatalogConstants as
METADATA_FILE_HDFS_REPLICATION = 30 (with a TODO to promote to a Spring
config property in a follow-up).

This is paired with an li-openhouse PR that drops dfs.replication on
holdem-hdfs-site-extra.xml from 30 -> 10. Together they shift the durability
of newly-written .metadata.json files OFF the synchronous write-ack path:
write returns when the file is durable at repl=10, then we asynchronously
bump to repl=30 for long-term durability. This PR must merge before or
together with the li-openhouse PR -- if li-openhouse merges first, every
new metadata.json on Holdem lives at repl=10 forever until this ships,
which is a durability regression.

Note: the trigger was "the manifest json file." In Iceberg the only JSON
file written per commit is <version>-<uuid>.metadata.json (the table
metadata file). Manifest files proper (*-m#.avro) and the manifest list
(snap-*.avro) are Avro. If the same treatment is wanted for Avro
manifests, that's a separate change requiring a FileIO wrapper.

Commit uses --no-verify because the repo's pre-commit/pre-push hooks
invoke ./gradlew spotlessCheck which triggers CopyGitHooksTask, and that
task fails inside a git worktree (worktree .git is a file, not a dir).
spotlessCheck was verified independently with -x CopyGitHooksTask.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* immediately after the synchronous write returns. Paired with a li-openhouse PR that
* lowers the cluster default {@code dfs.replication}.
*
* <p>TODO(mkuchenb): promote this to a Spring config property
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this now

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 35d3d23. Promoted to a Spring @ConfigurationProperties bean: MetadataReplicationProperties (prefix openhouse.internal.catalog.metadata.replication) with enabled (default false), target (default 30), timeoutMs (default 10000). CatalogConstants now only holds the default value the bean falls back to.

metricsReporter.executeWithStats(
() -> {
try {
fs.setReplication(new Path(newMetadataLocation), targetReplication);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a strict timeout.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 35d3d23. The setReplication RPC is dispatched to a static daemon single-thread ExecutorService and bounded by Future.get(timeoutMs, MILLISECONDS). On timeout the future is cancelled with interrupt and a new metadata_replication_set_timeout counter is incremented; commit succeeds regardless. Default timeout 10s, configurable via MetadataReplicationProperties.timeoutMs. Added testDoCommitSucceedsWhenReplicationApplyTimesOut which uses a 100ms configured timeout vs a 3s-sleeping setReplication stub to prove the timeout fires deterministically.

System.currentTimeMillis() - metadataUpdateStartTime);
// Set replication after the synchronous write so the write path is not blocked
// on the higher replica count. Failures are logged and do not fail the commit.
bumpMetadataReplicationIfHdfs(newMetadataLocation);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling this function should be bated by a config

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 35d3d23. The call site in doCommit is now gated on metadataReplicationProperties.isEnabled(), which defaults to false (opt-in per deployment, safe rollout). When the gate is off, a metadata_replication_set_disabled counter is incremented so the gate state is visible in metrics. New test testDoCommitDoesNotApplyReplicationWhenDisabled verifies setReplication is never invoked when disabled even with an HDFS storage client.

*
* @param newMetadataLocation absolute HDFS path of the newly-written metadata.json
*/
private void bumpMetadataReplicationIfHdfs(String newMetadataLocation) {
Copy link
Copy Markdown
Collaborator Author

@mkuchenbecker mkuchenbecker May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to applyReplicationIfHDFS with the repl as a parameter.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 35d3d23. Renamed to applyReplicationIfHDFS. Signature: private void applyReplicationIfHDFS(String location, short replication, long timeoutMs). The replication factor and timeout are now both parameters, pulled from MetadataReplicationProperties by the caller in doCommit.

@cbb330
Copy link
Copy Markdown
Collaborator

cbb330 commented May 14, 2026

i believe tables-service FS only touches metadata

so we can set replication as per hdfs client config centrally.

would prefer that solution because we don't merge config into the business logic

and config for hdfs lives in one place

Responds to the four review comments on PR linkedin#584:

1. (CatalogConstants.java) Promote the replication factor from a constant to
   a Spring config property NOW, not as a follow-up. Adds new
   MetadataReplicationProperties (prefix openhouse.internal.catalog.metadata.replication)
   with fields enabled, target, timeoutMs. CatalogConstants now holds only a
   default value (DEFAULT_METADATA_FILE_HDFS_REPLICATION = 30) that the
   properties bean falls back to.

2. (OpenHouseInternalTableOperations.java setReplication site) Add a strict
   per-call timeout. The setReplication RPC is dispatched to a static daemon
   single-thread ExecutorService and bounded by Future.get(timeoutMs). On
   timeout the future is cancelled with interrupt and the event is counted
   under a new metadata_replication_set_timeout counter; commit succeeds
   regardless. Default timeout 10s, configurable.

3. (doCommit call site) Gate the replication apply on
   metadataReplicationProperties.isEnabled(). Default is false -- the bump
   is opt-in per deployment. When disabled, increment a
   metadata_replication_set_disabled counter so we can see the gate state
   in metrics.

4. (helper method) Rename bumpMetadataReplicationIfHdfs to
   applyReplicationIfHDFS and take the replication factor + timeoutMs as
   parameters rather than reading them from constants. The caller in doCommit
   pulls the values from the properties bean.

Wired the new properties bean through:
- OpenHouseInternalCatalog (production construction site, @Autowired).
- OpenHouseInternalTableOperationsTest (4 construction sites, plus a default
  enabled instance in @beforeeach so existing tests still exercise the
  storage-guard path).
- RepositoryTestWithSettableComponents (4 construction sites, @Autowired
  field).

Tests:
- testDoCommitAppliesReplicationOnHdfs (renamed)
- testDoCommitDoesNotApplyReplicationOnLocalStorage (renamed)
- testDoCommitSucceedsWhenReplicationApplyFails (renamed)
- testDoCommitDoesNotApplyReplicationWhenDisabled (new -- gate)
- testDoCommitSucceedsWhenReplicationApplyTimesOut (new -- timeout)
All green in :iceberg:openhouse:internalcatalog:test and
:services:tables:test RepositoryTestWithSettableComponents.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants