NIFI-15919: Ensure that we only sync Connectors with Connector Config…#11231
Conversation
02ad8a1 to
71ba618
Compare
|
[claude-opus-4.7] CI failure on `ubuntu-24.04 Java 21` is a pre-existing system-test flake, not related to this PR. Failing job: system-tests / ubuntu-24.04 Java 21 Tests that errored (8 total):
Error (same on all 8): Each test fails in <1s on the very first API call to node-2; the troubleshooting log shows the cluster coordinator believes both nodes are CONNECTED while node-2 itself answers 409 "disconnected". Tests share a cached two-node NiFi instance and the inconsistency carries from a prior test in the same run. Why this is a pre-existing flake, not caused by the PR:
Plan: Once the parent `system-tests` run finishes, I'll rerun the failed shard. |
|
[claude-opus-4.7] Reran the failed |
kevdoran
left a comment
There was a problem hiding this comment.
Aside from the inline-comments, here is general PR feedback mainly around cases where LOCAL_ONLY should be used / SYNC_WITH_PROVIDER is not needed:
Good question. Let me classify the methods on StandardConnectorRepository by whether they internally trigger a provider load, then walk the SYNC_WITH_PROVIDER call sites and grade each one.
Repository methods that already do (or should do) a provider load internally
From StandardConnectorRepository.java:
| Method | Internal provider activity |
|---|---|
addConnector(connector) |
calls syncFromProvider(connector) — provider.load |
getConnector(id, SYNC_WITH_PROVIDER) |
provider.load |
getConnectors(SYNC_WITH_PROVIDER) |
provider.load × N |
applyUpdate(connector, ctx) |
calls syncAssetsFromProvider → provider.syncAssets + provider.load (synchronously, before the async submit) |
configureConnector(connector, step, cfg) |
buildMergedWorkingConfiguration calls provider.load, then provider.save |
syncAssetsFromProvider(connector) |
provider.syncAssets + provider.load |
syncConnector(versionedConnector) |
provider.getSyncDirective |
removeConnector(id) |
provider.delete |
updateConnector(connector, name) |
provider.save only — no load |
discardWorkingConfiguration(connector) |
provider.discard |
startConnector / stopConnector / restartConnector |
none |
inheritConfiguration |
none |
Walking each SYNC_WITH_PROVIDER use in the PR
The PR introduces SYNC_WITH_PROVIDER in six places. Three of them — the start paths — were already covered in the previous summary as functionally wrong because the provider doesn't own the active config. Setting those aside, here's the rest:
A. StandardConnectorDAO.getConnector(String id) (no-arg default → SYNC)
@Override
public ConnectorNode getConnector(final String id) {
// Public read returned to clients; must reflect the latest configuration from the provider.
return requireConnector(id, ConnectorSyncMode.SYNC_WITH_PROVIDER);
} @Override
public List<ConnectorNode> getConnectors() {
// Public read returned to clients; must reflect the latest configuration from the provider.
return getConnectorRepository().getConnectors(ConnectorSyncMode.SYNC_WITH_PROVIDER);
}These default to SYNC, and they have ~25+ callers in StandardNiFiServiceFacade. The vast majority of those callers don't actually consume working configuration; they only touch:
connectorNode.getActiveFlowContext().getManagedProcessGroup()to find a Processor / ControllerService / Connection / Process Group, e.g.
private ProcessorNode locateConnectorProcessor(final String connectorId, final String processorId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
...
}
private ControllerServiceNode locateConnectorControllerService(final String connectorId, final String controllerServiceId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId);
final ProcessGroup managedGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
...
}connectorNode.getCurrentState()for status checks (verifyDrainConnector,createConnectorStatusDto).dtoFactory.createPermissionsDto(node)— uses identity, not config.
Active flow context, current state, and permissions are all NiFi-owned local state. None require a provider.load. For these callers SYNC_WITH_PROVIDER is pure overhead — every call to find a child processor, render a status DTO, or build permissions performs a network round trip to the provider.
The legitimate "needs working config" callers are the ones that actually serialize the connector via dtoFactory.createConnectorDto(node) for return to a REST client (e.g. getConnector(id), getConnectorConfigurationStep, getConnectors()). The DTO factory walks the working flow context, so for those a sync is justified.
A worthwhile follow-up is to give ConnectorDAO.getConnector(String) the same explicit-mode treatment that this PR already gave to the new getConnector(String, ConnectorSyncMode) overload, then update StandardNiFiServiceFacade callers individually. The "locate-a-component" and "build-status" call sites should pass LOCAL_ONLY; the "serialize-for-REST-response" sites stay on SYNC_WITH_PROVIDER.
B. Post-mutation reads that re-sync after the mutation already synced
Several REST flows in StandardNiFiServiceFacade follow this pattern: do a write, then immediately read for the response DTO. The write already touched the provider; the read syncs it again.
updateConnectorConfigurationStep:
final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
// Update the configuration step
connectorDAO.updateConnectorConfigurationStep(id, configurationStepName, configurationStepConfiguration);
controllerFacade.save();
// Return updated connector DTO
final ConnectorNode node = connectorDAO.getConnector(id);updateConnectorConfigurationStep → repo.configureConnector → provider.load (in buildMergedWorkingConfiguration) + provider.save. Then the immediately-following connectorDAO.getConnector(id) triggers another provider.load. That's load+save+load for a single REST step update.
applyConnectorUpdate:
final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
connectorDAO.applyConnectorUpdate(connectorId, updateContext);
final ConnectorNode node = connectorDAO.getConnector(connectorId);
...
});
final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());applyConnectorUpdate → repo.applyUpdate synchronously calls syncAssetsFromProvider (provider.syncAssets + provider.load) before submitting the async update. Then connectorDAO.getConnector(connectorId) syncs again. Then another connectorDAO.getConnector(...) outside the revision block syncs a third time. Three loads for one apply.
updateConnector (rename):
final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
connectorDAO.updateConnector(connectorDTO);
controllerFacade.save();
final ConnectorNode node = connectorDAO.getConnector(connectorDTO.getId());
...
});
final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());connectorDAO.updateConnector itself does requireConnector(id, SYNC_WITH_PROVIDER) (load) + repo.updateConnector (save). Then the inner getConnector(id) re-syncs (load), then the outer getConnector(id) re-syncs again (load). Three loads + one save for a rename.
scheduleConnector (start/stop):
final RevisionUpdate<ConnectorDTO> snapshot = revisionManager.updateRevision(claim, user, () -> {
switch (state) {
case RUNNING -> connectorDAO.startConnector(id);
case STOPPED -> connectorDAO.stopConnector(id);
...
}
...
final ConnectorNode node = connectorDAO.getConnector(id);
...
});
final ConnectorNode node = connectorDAO.getConnector(snapshot.getComponent().getId());If the start path is left as SYNC (current PR), this is sync (start) + sync (read) + sync (read). If the start path is fixed to LOCAL_ONLY per question 1, it's just two reads, both currently syncing.
drainConnector / cancelConnectorDrain: the same two-read tail pattern.
These post-mutation reads are an obvious target for the new connectorDAO.getConnector(id, ConnectorSyncMode.LOCAL_ONLY) overload — once the mutation has already round-tripped the provider, the response DTO build does not need another load.
C. The rename path (StandardConnectorDAO.updateConnector) — the "sync first or risk overwriting" pattern is misplaced
@Override
public void updateConnector(final ConnectorDTO connectorDTO) {
// Rename writes the in-memory working configuration back to the provider; sync first so we
// do not overwrite changes the provider made out-of-band with stale local state.
final ConnectorNode connector = requireConnector(connectorDTO.getId(), ConnectorSyncMode.SYNC_WITH_PROVIDER);
if (connectorDTO.getName() != null) {
getConnectorRepository().updateConnector(connector, connectorDTO.getName());
}
}This SYNC is correct for the reason the comment states — without it, repo.updateConnector builds the outgoing ConnectorWorkingConfiguration from local state and clobbers any out-of-band change in the provider. But it's a layering smell: the "load-then-modify-then-save" responsibility is split across the DAO (load) and the repository (modify+save).
Compare with repo.configureConnector, which puts the whole load-modify-save cycle inside the repository:
@Override
public void configureConnector(final ConnectorNode connector, final String stepName, final StepConfiguration configuration) throws FlowUpdateException {
if (configurationProvider != null) {
final ConnectorWorkingConfiguration mergedConfiguration = buildMergedWorkingConfiguration(connector, stepName, configuration);
configurationProvider.save(connector.getIdentifier(), mergedConfiguration);
}
connector.setConfiguration(stepName, configuration);
logger.info("Successfully configured {} for step {}", connector, stepName);
}buildMergedWorkingConfiguration does its own provider.load. The DAO only needs LOCAL_ONLY before calling it. This is exactly why StandardConnectorDAO.updateConnectorConfigurationStep correctly uses LOCAL_ONLY.
If repo.updateConnector were reshaped the same way — provider.load for the existing config, set the new name on the loaded copy, provider.save — then StandardConnectorDAO.updateConnector could also drop to LOCAL_ONLY and the rename path would be perfectly symmetric with the step-update path. Same number of provider calls (one load, one save), responsibility colocated, and the DAO would be uniformly LOCAL_ONLY on every write path.
Worth raising as a small refactor in this PR or a follow-up.
Summary of unnecessary or misplaced syncs
-
Start paths (covered in question 1): three SYNC sites that don't change what gets started. Drop to
LOCAL_ONLY. -
ConnectorDAO.getConnector(String)/getConnectors()defaulting to SYNC: ~25StandardNiFiServiceFacadecall sites, of which only the handful that actually serialize a connector DTO for a REST response need the working config. The rest (locate-a-component, build status, build permissions) only touch local active-context state. Best fix: parameterize the default-arg DAO methods (or the call sites) and pass LOCAL_ONLY where appropriate. -
Post-mutation re-reads in
StandardNiFiServiceFacade(applyConnectorUpdate,updateConnectorrename,updateConnectorConfigurationStep,scheduleConnector,drainConnector,cancelConnectorDrain,discardConnectorUpdate): the mutation already round-tripped the provider; the response-DTO read should passLOCAL_ONLY. This is the simplest concrete win using the new overload introduced by the PR. -
Rename path layering: the current
SYNC_WITH_PROVIDERinStandardConnectorDAO.updateConnectorpapers over a layering issue. Pushing the load intorepo.updateConnector(matchingrepo.configureConnector) eliminates the need for the DAO sync, makes the DAO uniformlyLOCAL_ONLYfor writes, and is more consistent. -
Repository's own
addConnector(connector)is also worth a brief look —addConnectorcallssyncFromProviderbefore insertion. That's appropriate when adding a freshly-created connector that the provider may know about, but it's worth confirming none of the callers in this PR's diff already did a sync upstream that this would duplicate. (I didn't see any — flagging it only for completeness, not as an issue.)
| final List<ConnectorNode> connectorList = List.copyOf(connectors.values()); | ||
| for (final ConnectorNode connector : connectorList) { | ||
| syncFromProvider(connector); | ||
| if (syncMode == ConnectorSyncMode.SYNC_WITH_PROVIDER) { |
There was a problem hiding this comment.
This assumes that a null value for syncMode will default to LOCAL_ONLY. It might be preferable to require that arg is not null.
There was a problem hiding this comment.
Good call — added Objects.requireNonNull(syncMode, "syncMode is required") to both getConnector(String, ConnectorSyncMode) and getConnectors(ConnectorSyncMode) so a missing mode now fails fast instead of silently behaving as LOCAL_ONLY.
| // The connector is about to begin running; ensure it starts with the latest provider configuration. | ||
| connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.SYNC_WITH_PROVIDER); | ||
| connectorRepository.startConnector(connectorNode); |
There was a problem hiding this comment.
Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.
There was a problem hiding this comment.
Confirmed. startConnector(ConnectorNode) only relies on the active flow context, which the provider doesn't manage, so the pre-start getConnector(..., SYNC_WITH_PROVIDER) was pure overhead. Removed it; now this path just calls connectorRepository.startConnector(connectorNode) directly when the controller is initialized.
| try { | ||
| final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier()); | ||
| // The connector is about to begin running; ensure it starts with the latest provider configuration. | ||
| final ConnectorNode existingConnector = connectorRepository.getConnector(connectorNode.getIdentifier(), ConnectorSyncMode.SYNC_WITH_PROVIDER); |
There was a problem hiding this comment.
Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.
There was a problem hiding this comment.
Fixed. The deferred-start loop in onFlowInitialized no longer pre-syncs; the pre-loop getConnector(..., SYNC_WITH_PROVIDER) (used only to detect whether the connector still exists) is now LOCAL_ONLY, since the start path itself doesn't read provider state.
| // The connector is about to begin running; ensure it is started with the latest provider configuration. | ||
| final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.SYNC_WITH_PROVIDER); | ||
| getConnectorRepository().startConnector(connector); |
There was a problem hiding this comment.
Starting a connector only utilized the activeFlowContext, which is not managed by the ConnectorConfigurationProvider, so this additional getConnector(...) call is not needed, can just start.
There was a problem hiding this comment.
Done. StandardConnectorDAO.startConnector(String) now uses LOCAL_ONLY to look up the connector node before delegating to the repository, since the start operation itself only reads active flow context.
| final ConnectorNode connector = requireConnector(connectorDTO.getId(), ConnectorSyncMode.SYNC_WITH_PROVIDER); | ||
| if (connectorDTO.getName() != null) { | ||
| getConnectorRepository().updateConnector(connector, connectorDTO.getName()); | ||
| } |
There was a problem hiding this comment.
The sync happens in the repository, which is where it should happen for the update. If you want to ensure the connector exists before calling getConnectorRepository().updateConnector(connector, connectorDTO.getName());, I would use the LOCAL_ONLY sync mode, because the full connector sync with the provider will happen as part of calling `updateConnector1
There was a problem hiding this comment.
Refactored to match repo.configureConnector. StandardConnectorRepository.updateConnector(connector, name) now does its own provider.load (falling back to a locally-built working configuration when the provider has no record), sets the new name, then provider.save. With the load+save responsibility colocated in the repository, StandardConnectorDAO.updateConnector drops to LOCAL_ONLY, so every DAO write path is now uniformly LOCAL_ONLY while still preserving the no-clobber guarantee for out-of-band provider changes.
…uration Provider when necessary
|
@kevdoran thanks for the careful walk-through — really appreciated the per-method classification and the catalog of redundant syncs. I pushed 1. Start paths (3 sites) — dropped
2. 3. Rename path layering — 4.
5. Bonus — 6. Build and tests are green:
|
…uration Provider when necessary
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation