Skip to content

NIFI-15919: Ensure that we only sync Connectors with Connector Config…#11231

Merged
kevdoran merged 1 commit into
apache:mainfrom
markap14:NIFI-15919
May 12, 2026
Merged

NIFI-15919: Ensure that we only sync Connectors with Connector Config…#11231
kevdoran merged 1 commit into
apache:mainfrom
markap14:NIFI-15919

Conversation

@markap14
Copy link
Copy Markdown
Contributor

…uration Provider when necessary

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@markap14 markap14 force-pushed the NIFI-15919 branch 2 times, most recently from 02ad8a1 to 71ba618 Compare May 11, 2026 14:50
Copy link
Copy Markdown
Contributor

@mcgilman mcgilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @markap14! A couple comments below based on a first pass.

@markap14 markap14 marked this pull request as ready for review May 11, 2026 15:40
@markap14
Copy link
Copy Markdown
Contributor Author

[claude-opus-4.7] CI failure on `ubuntu-24.04 Java 21` is a pre-existing system-test flake, not related to this PR.

Failing job: system-tests / ubuntu-24.04 Java 21

Tests that errored (8 total):

  • `ClusteredConnectorIT.testBundleResolutionRequireExactBundle`
  • `ClusteredConnectorIT.testBundleResolutionResolveNewestBundle`
  • `ClusteredConnectorIT.testCreateConfigureRestart`
  • `ClusteredConnectorIT.testBundleResolutionResolveBundle`
  • `ClusteredConnectorIT.testSecretReferences`
  • `ClusteredConnectorIT.testDeleteConnectorNoDataQueued`
  • `ClusteredConnectorDrainIT.testDrainWithNodeCompletingAtDifferentTimes`
  • `ClusteredConnectorDrainIT.testCancelDrainWithOneNodeAlreadyComplete`

Error (same on all 8):
```
org.apache.nifi.toolkit.client.NiFiClientException: Error applying connector update: Node localhost:5672 is unable to fulfill this request due to: This node is disconnected from its configured cluster. The requested change will only be allowed if the flag to acknowledge the disconnected node is set.
Caused by: jakarta.ws.rs.ClientErrorException: HTTP 409 Conflict
at org.apache.nifi.toolkit.client.impl.AbstractJerseyClient.executeAction(AbstractJerseyClient.java:96)
at org.apache.nifi.toolkit.client.impl.JerseyConnectorClient.applyUpdate(JerseyConnectorClient.java:422)
```

Each test fails in <1s on the very first API call to node-2; the troubleshooting log shows the cluster coordinator believes both nodes are CONNECTED while node-2 itself answers 409 "disconnected". Tests share a cached two-node NiFi instance and the inconsistency carries from a prior test in the same run.

Why this is a pre-existing flake, not caused by the PR:

  1. The exact same eight tests failed on `main` on May 5 in run 25366699710 / ubuntu-24.04 Java 21 with the same elapsed times and the same `Node localhost:5672 ... disconnected from its configured cluster` error.
  2. `ubuntu-24.04 Java 25` (same module, same shard, different JDK) passed in this run, so it isn't a deterministic break.
  3. The failures are coordinator/node cluster state desync, not anything in the connector sync-mode path this PR touches.

Plan: Once the parent `system-tests` run finishes, I'll rerun the failed shard.

@markap14
Copy link
Copy Markdown
Contributor Author

[claude-opus-4.7] Reran the failed ubuntu-24.04 Java 21 shard now that parent run 25678529070 has completed. New job: https://github.com/apache/nifi/actions/runs/25678529070/job/75399033858

Copy link
Copy Markdown
Contributor

@kevdoran kevdoran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the inline-comments, here is general PR feedback mainly around cases where LOCAL_ONLY should be used / SYNC_WITH_PROVIDER is not needed:

Good question. Let me classify the methods on StandardConnectorRepository by whether they internally trigger a provider load, then walk the SYNC_WITH_PROVIDER call sites and grade each one.

Repository methods that already do (or should do) a provider load internally

From StandardConnectorRepository.java:

Method Internal provider activity
addConnector(connector) calls syncFromProvider(connector) — provider.load
getConnector(id, SYNC_WITH_PROVIDER) provider.load
getConnectors(SYNC_WITH_PROVIDER) provider.load × N
applyUpdate(connector, ctx) calls syncAssetsFromProviderprovider.syncAssets + provider.load (synchronously, before the async submit)
configureConnector(connector, step, cfg) buildMergedWorkingConfiguration calls provider.load, then provider.save
syncAssetsFromProvider(connector) provider.syncAssets + provider.load
syncConnector(versionedConnector) provider.getSyncDirective
removeConnector(id) provider.delete
updateConnector(connector, name) provider.save only — no load
discardWorkingConfiguration(connector) provider.discard
startConnector / stopConnector / restartConnector none
inheritConfiguration none

Walking each SYNC_WITH_PROVIDER use in the PR

The PR introduces SYNC_WITH_PROVIDER in six places. Three of them — the start paths — were already covered in the previous summary as functionally wrong because the provider doesn't own the active config. Setting those aside, here's the rest:

A. StandardConnectorDAO.getConnector(String id) (no-arg default → SYNC)

    @Override
    public ConnectorNode getConnector(final String id) {
        // Public read returned to clients; must reflect the latest configuration from the provider.
        return requireConnector(id, ConnectorSyncMode.SYNC_WITH_PROVIDER);
    }
    @Override
    public List<ConnectorNode> getConnectors() {
        // Public read returned to clients; must reflect the latest configuration from the provider.
        return getConnectorRepository().getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER);
    }

These default to SYNC, and they have ~25+ callers in StandardNiFiServiceFacade. The vast majority of those callers don't actually consume working configuration; they only touch:

  • connectorNode.getActiveFlowContext().getManagedProcessGroup() to find a Processor / ControllerService / Connection / Process Group, e.g.
    private ProcessorNode locateConnectorProcessor(final String connectorId, final String processorId) {
        final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId);
        final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
        ...
    }

    private ControllerServiceNode locateConnectorControllerService(final String connectorId, final String controllerServiceId) {
        final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId);
        final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
        ...
    }
  • connectorNode.getCurrentState() for status checks (verifyDrainConnector, createConnectorStatusDto).
  • dtoFactory.createPermissionsDto(node) — uses identity, not config.

Active flow context, current state, and permissions are all NiFi-owned local state. None require a provider.load. For these callers SYNC_WITH_PROVIDER is pure overhead — every call to find a child processor, render a status DTO, or build permissions performs a network round trip to the provider.

The legitimate "needs working config" callers are the ones that actually serialize the connector via dtoFactory.createConnectorDto(node) for return to a REST client (e.g. getConnector(id), getConnectorConfigurationStep, getConnectors()). The DTO factory walks the working flow context, so for those a sync is justified.

A worthwhile follow-up is to give ConnectorDAO.getConnector(String) the same explicit-mode treatment that this PR already gave to the new getConnector(String, ConnectorSyncMode) overload, then update StandardNiFiServiceFacade callers individually. The "locate-a-component" and "build-status" call sites should pass LOCAL_ONLY; the "serialize-for-REST-response" sites stay on SYNC_WITH_PROVIDER.

B. Post-mutation reads that re-sync after the mutation already synced

Several REST flows in StandardNiFiServiceFacade follow this pattern: do a write, then immediately read for the response DTO. The write already touched the provider; the read syncs it again.

updateConnectorConfigurationStep:

        final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            // Update the configuration step
            connectorDAO.updateConnectorConfigurationStep(id, configurationStepName, configurationStepConfiguration);
            controllerFacade.save();

            // Return updated connector DTO
            final ConnectorNode node = connectorDAO.getConnector(id);

updateConnectorConfigurationSteprepo.configureConnectorprovider.load (in buildMergedWorkingConfiguration) + provider.save. Then the immediately-following connectorDAO.getConnector(id) triggers another provider.load. That's load+save+load for a single REST step update.

applyConnectorUpdate:

        final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            connectorDAO.applyConnectorUpdate(connectorId, updateContext);

            final ConnectorNode node = connectorDAO.getConnector(connectorId);
            ...
        });

        final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());

applyConnectorUpdaterepo.applyUpdate synchronously calls syncAssetsFromProvider (provider.syncAssets + provider.load) before submitting the async update. Then connectorDAO.getConnector(connectorId) syncs again. Then another connectorDAO.getConnector(...) outside the revision block syncs a third time. Three loads for one apply.

updateConnector (rename):

        final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            connectorDAO.updateConnector(connectorDTO);

            controllerFacade.save();

            final ConnectorNode node = connectorDAO.getConnector(connectorDTO.getId());
            ...
        });

        final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());

connectorDAO.updateConnector itself does requireConnector(id, SYNC_WITH_PROVIDER) (load) + repo.updateConnector (save). Then the inner getConnector(id) re-syncs (load), then the outer getConnector(id) re-syncs again (load). Three loads + one save for a rename.

scheduleConnector (start/stop):

        final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
            switch (state) {
                case RUNNING -> connectorDAO.startConnector(id);
                case STOPPED -> connectorDAO.stopConnector(id);
                ...
            }
            ...
            final ConnectorNode node = connectorDAO.getConnector(id);
            ...
        });

        final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());

If the start path is left as SYNC (current PR), this is sync (start) + sync (read) + sync (read). If the start path is fixed to LOCAL_ONLY per question 1, it's just two reads, both currently syncing.

drainConnector / cancelConnectorDrain: the same two-read tail pattern.

These post-mutation reads are an obvious target for the new connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY) overload — once the mutation has already round-tripped the provider, the response DTO build does not need another load.

C. The rename path (StandardConnectorDAO.updateConnector) — the "sync first or risk overwriting" pattern is misplaced

    @Override
    public void updateConnector(final ConnectorDTO connectorDTO) {
        // Rename writes the in-memory working configuration back to the provider; sync first so we
        // do not overwrite changes the provider made out-of-band with stale local state.
        final ConnectorNode connector = requireConnector(connectorDTO.getId(), ConnectorSyncMode.SYNC_WITH_PROVIDER);
        if (connectorDTO.getName() != null) {
            getConnectorRepository().updateConnector(connector, connectorDTO.getName());
        }
    }

This SYNC is correct for the reason the comment states — without it, repo.updateConnector builds the outgoing ConnectorWorkingConfiguration from local state and clobbers any out-of-band change in the provider. But it's a layering smell: the "load-then-modify-then-save" responsibility is split across the DAO (load) and the repository (modify+save).

Compare with repo.configureConnector, which puts the whole load-modify-save cycle inside the repository:

    @Override
    public void configureConnector(final ConnectorNode connector, final String stepName, final StepConfiguration configuration) throws FlowUpdateException {
        if (configurationProvider != null) {
            final ConnectorWorkingConfiguration mergedConfiguration = buildMergedWorkingConfiguration(connector, stepName, configuration);
            configurationProvider.save(connector.getIdentifier(), mergedConfiguration);
        }

        connector.setConfiguration(stepName, configuration);
        logger.info("Successfully configured {} for step {}", connector, stepName);
    }

buildMergedWorkingConfiguration does its own provider.load. The DAO only needs LOCAL_ONLY before calling it. This is exactly why StandardConnectorDAO.updateConnectorConfigurationStep correctly uses LOCAL_ONLY.

If repo.updateConnector were reshaped the same way — provider.load for the existing config, set the new name on the loaded copy, provider.save — then StandardConnectorDAO.updateConnector could also drop to LOCAL_ONLY and the rename path would be perfectly symmetric with the step-update path. Same number of provider calls (one load, one save), responsibility colocated, and the DAO would be uniformly LOCAL_ONLY on every write path.

Worth raising as a small refactor in this PR or a follow-up.

Summary of unnecessary or misplaced syncs

  1. Start paths (covered in question 1): three SYNC sites that don't change what gets started. Drop to LOCAL_ONLY.

  2. ConnectorDAO.getConnector(String) / getConnectors() defaulting to SYNC: ~25 StandardNiFiServiceFacade call sites, of which only the handful that actually serialize a connector DTO for a REST response need the working config. The rest (locate-a-component, build status, build permissions) only touch local active-context state. Best fix: parameterize the default-arg DAO methods (or the call sites) and pass LOCAL_ONLY where appropriate.

  3. Post-mutation re-reads in StandardNiFiServiceFacade (applyConnectorUpdate, updateConnector rename, updateConnectorConfigurationStep, scheduleConnector, drainConnector, cancelConnectorDrain, discardConnectorUpdate): the mutation already round-tripped the provider; the response-DTO read should pass LOCAL_ONLY. This is the simplest concrete win using the new overload introduced by the PR.

  4. Rename path layering: the current SYNC_WITH_PROVIDER in StandardConnectorDAO.updateConnector papers over a layering issue. Pushing the load into repo.updateConnector (matching repo.configureConnector) eliminates the need for the DAO sync, makes the DAO uniformly LOCAL_ONLY for writes, and is more consistent.

  5. Repository's own addConnector(connector) is also worth a brief look — addConnector calls syncFromProvider before insertion. That's appropriate when adding a freshly-created connector that the provider may know about, but it's worth confirming none of the callers in this PR's diff already did a sync upstream that this would duplicate. (I didn't see any — flagging it only for completeness, not as an issue.)

final List<ConnectorNode> connectorList = List.copyOf(connectors.values());
for (final ConnectorNode connector : connectorList) {
syncFromProvider(connector);
if (syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that a null value for syncMode will default to LOCAL_ONLY. It might be preferable to require that arg is not null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — added Objects.requireNonNull(syncMode, "syncMode is required") to both getConnector(String, ConnectorSyncMode) and getConnectors(ConnectorSyncMode) so a missing mode now fails fast instead of silently behaving as LOCAL_ONLY.

Comment on lines 2419 to 2421
// The connector is about to begin running; ensure it starts with the latest provider configuration.
connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.SYNC_WITH_PROVIDER);
connectorRepository.startConnector(connectorNode);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. startConnector(ConnectorNode) only relies on the active flow context, which the provider doesn't manage, so the pre-start getConnector(..., SYNC_WITH_PROVIDER) was pure overhead. Removed it; now this path just calls connectorRepository.startConnector(connectorNode) directly when the controller is initialized.

try {
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier());
// The connector is about to begin running; ensure it starts with the latest provider configuration.
final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.SYNC_WITH_PROVIDER);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. The deferred-start loop in onFlowInitialized no longer pre-syncs; the pre-loop getConnector(..., SYNC_WITH_PROVIDER) (used only to detect whether the connector still exists) is now LOCAL_ONLY, since the start path itself doesn't read provider state.

Comment on lines 134 to 136
// The connector is about to begin running; ensure it is started with the latest provider configuration.
final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.SYNC_WITH_PROVIDER);
getConnectorRepository().startConnector(connector);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. StandardConnectorDAO.startConnector(String) now uses LOCAL_ONLY to look up the connector node before delegating to the repository, since the start operation itself only reads active flow context.

Comment on lines +118 to 123
final ConnectorNode connector = requireConnector(connectorDTO.getId(), ConnectorSyncMode.SYNC_WITH_PROVIDER);
if (connectorDTO.getName() != null) {
getConnectorRepository().updateConnector(connector, connectorDTO.getName());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync happens in the repository, which is where it should happen for the update. If you want to ensure the connector exists before calling getConnectorRepository().updateConnector(connector, connectorDTO.getName());, I would use the LOCAL_ONLY sync mode, because the full connector sync with the provider will happen as part of calling `updateConnector1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to match repo.configureConnector. StandardConnectorRepository.updateConnector(connector, name) now does its own provider.load (falling back to a locally-built working configuration when the provider has no record), sets the new name, then provider.save. With the load+save responsibility colocated in the repository, StandardConnectorDAO.updateConnector drops to LOCAL_ONLY, so every DAO write path is now uniformly LOCAL_ONLY while still preserving the no-clobber guarantee for out-of-band provider changes.

@markap14
Copy link
Copy Markdown
Contributor Author

@kevdoran thanks for the careful walk-through — really appreciated the per-method classification and the catalog of redundant syncs. I pushed 146f89be67c with all of your suggestions implemented:

1. Start paths (3 sites) — dropped SYNC_WITH_PROVIDER:

  • FlowController#startConnector(ConnectorNode) — removed the pre-start getConnector(..., SYNC_WITH_PROVIDER) entirely; the start path doesn't read provider state.
  • FlowController#onFlowInitialized deferred-start loop — the existence check is now LOCAL_ONLY.
  • StandardConnectorDAO#startConnector(String) — switched to LOCAL_ONLY.

2. Objects.requireNonNull(syncMode) — added to both StandardConnectorRepository.getConnector(String, ConnectorSyncMode) and getConnectors(ConnectorSyncMode) so a missing mode fails fast.

3. Rename path layeringStandardConnectorRepository.updateConnector(connector, name) now does its own provider.load (falling back to a locally-built working configuration when the provider has no record) before provider.save, mirroring configureConnector. StandardConnectorDAO.updateConnector drops to LOCAL_ONLY accordingly. Every DAO write path is now uniformly LOCAL_ONLY while the no-clobber guarantee for out-of-band provider changes is preserved by the repository.

4. StandardNiFiServiceFacade callers — converted via the existing getConnector(id, ConnectorSyncMode) overload:

  • Locators (6): locateConnectorProcessor, locateConnectorControllerService, getConnectorFlow, getConnectorProcessGroupStatus, getConnectorControllerServices, searchConnector — all LOCAL_ONLY (only touch active flow context).
  • State check (1): verifyDrainConnectorLOCAL_ONLY.
  • Post-mutation re-reads (14): the inside-revision-block and outside-revision-block reads in createConnector, updateConnector (rename), scheduleConnector, drainConnector, cancelConnectorDrain, updateConnectorConfigurationStep, applyConnectorUpdate, and discardConnectorUpdate — all LOCAL_ONLY. The mutation already round-tripped the provider; the response-DTO read no longer triggers another load.
  • Public reads kept on SYNC: the four "serialize for REST response" sites you flagged — getConnectors(), getConnector(id), getConnectorConfigurationSteps, getConnectorConfigurationStep. The pre-delete read in deleteConnector also stays on SYNC for response-DTO symmetry.

5. Bonus — ConnectorAuditor AOP advice (5 sites) — these AOP @Around methods on removeConnector, startConnector, stopConnector, updateConnectorConfigurationStep, and applyConnectorUpdate were calling the no-arg connectorDAO.getConnector(id) (which defaulted to SYNC) just to capture local audit metadata around every mutation. Without changing them, every mutation would still pay one provider round trip via the auditor even though the DAO method itself is now LOCAL_ONLY. Switched all five to LOCAL_ONLY.

6. addConnector(connector) syncing — agree that's fine; nothing in the diff calls it after an upstream provider sync, so no duplicate work.

Build and tests are green:

  • nifi-framework-core — 543 tests, 0 failures.
  • nifi-web-api — 529 tests, 0 failures (added testGetConnectorLocalOnly and testGetConnectorLocalOnlyWithNonExistentId for the new DAO overload, updated stubs/verifies in StandardNiFiServiceFacadeTest for the seven Group A sites that now pass ConnectorSyncMode.LOCAL_ONLY).
  • checkstyle:check pmd:check clean on both modules.

Copy link
Copy Markdown
Contributor

@kevdoran kevdoran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for those changes @markap14. +1 from me. There seem to be some unrelated system check failures that are not blocking. Will get this merged.

@kevdoran kevdoran merged commit fc08aa8 into apache:main May 12, 2026
15 of 17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants