diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c27667d1..6f391a122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. - hadoop: Add precompiled hadoop for later reuse in dependent images ([#1466], [#1474]). - nifi: Add version `2.9.0` ([#1463]). +- nifi: Backport NIFI-15801 to 2.x versions ([#1481]). +- nifi: Backport NIFI-15901 to 2.x versions ([#1481]). ### Changed @@ -29,6 +31,7 @@ All notable changes to this project will be documented in this file. [#1471]: https://github.com/stackabletech/docker-images/pull/1471 [#1474]: https://github.com/stackabletech/docker-images/pull/1474 [#1476]: https://github.com/stackabletech/docker-images/pull/1476 +[#1481]: https://github.com/stackabletech/docker-images/pull/1481 ## [26.3.0] - 2026-03-16 diff --git a/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch b/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch new file mode 100644 index 000000000..34d1806c5 --- /dev/null +++ b/nifi/stackable/patches/2.6.0/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -0,0 +1,117 @@ +From ec247bbf5e8b607267abaa2b302dc4a355e9767e Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:44:42 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index b79ee4d6e8..c3e059171e 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -1189,21 +1189,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); ++ ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ try { ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); ++ } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); + } + } + } +@@ -1375,7 +1393,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + group.setVersionedComponentId(proposed.getIdentifier()); diff --git a/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..c2b0a196f --- /dev/null +++ b/nifi/stackable/patches/2.6.0/0009-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 67b404db6005a90b9ef6fce018524b181dae9b98 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:50:06 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index c3e059171e..030f7f5a90 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } + diff --git a/nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch b/nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch new file mode 100644 index 000000000..aafba6b22 --- /dev/null +++ b/nifi/stackable/patches/2.7.2/0008-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -0,0 +1,117 @@ +From 7dd6e0295bce03dcc86a0c671a73b67641c7af25 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:19:10 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index b79ee4d6e8..d8f2d2da6d 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -1189,21 +1189,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); ++ ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ try { ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); ++ } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); + } + } + } +@@ -1375,7 +1393,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id); + group.setVersionedComponentId(proposed.getIdentifier()); diff --git a/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..34186aabe --- /dev/null +++ b/nifi/stackable/patches/2.7.2/0009-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 5a16796127e103b181a3cdb760f566c65bab0851 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 17:25:46 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index d8f2d2da6d..01ffa09a4b 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -741,17 +741,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } + diff --git a/nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch b/nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch new file mode 100644 index 000000000..b497941d0 --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0006-NIFI-15801-Stop-processors-in-synchronizeProcessors-.patch @@ -0,0 +1,117 @@ +From 2cf0721126a21eb429500000de96372272d9d3b5 Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 16:48:22 +0200 +Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++------- + 1 file changed, 40 insertions(+), 21 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index 092d2f7e7b..2c64fa8cae 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -270,8 +270,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), + additions.getParameterContexts(), additions.getParameterProviders(), group); + additionsBuilder.addProcessGroup(newProcessGroup); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -386,8 +386,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), + parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); +- } catch (final ProcessorInstantiationException pie) { +- throw new RuntimeException(pie); ++ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { ++ throw new RuntimeException(e); + } + }); + +@@ -416,7 +416,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + + // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we + // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, +@@ -691,7 +691,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, + final Map childGroupsByVersionedId, final Map parameterProviderReferences, +- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { + + for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { + final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); +@@ -1193,21 +1193,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, + final ProcessGroup topLevelGroup) +- throws ProcessorInstantiationException { ++ throws ProcessorInstantiationException, FlowSynchronizationException { + +- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { +- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); +- if (processor == null) { +- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); +- LOG.info("Added {} to {}", added, group); +- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { +- updateProcessor(processor, proposedProcessor, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); +- LOG.info("Updated {}", processor); +- } else { +- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ final Set processorsToRestart = new HashSet<>(); ++ ++ try { ++ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { ++ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); ++ if (processor == null) { ++ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); ++ LOG.info("Added {} to {}", added, group); ++ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { ++ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ try { ++ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); ++ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { ++ processorsToRestart.add(processor); ++ } ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); ++ } ++ updateProcessor(processor, proposedProcessor, topLevelGroup); ++ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, ++ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); ++ LOG.info("Updated {}", processor); ++ } else { ++ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); ++ } ++ } ++ } finally { ++ for (final ProcessorNode processor : processorsToRestart) { ++ processor.getProcessGroup().startProcessor(processor, false); ++ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); + } + } + } +@@ -1379,7 +1397,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + + private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, + final Map versionedParameterContexts, +- final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { ++ final Map parameterProviderReferences, ProcessGroup topLevelGroup) ++ throws ProcessorInstantiationException, FlowSynchronizationException { + final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); + final String connectorId = destination.getConnectorIdentifier().orElse(null); + final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId); diff --git a/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch b/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch new file mode 100644 index 000000000..94c196c98 --- /dev/null +++ b/nifi/stackable/patches/2.9.0/0007-NIFI-15901-Disable-controller-services-before-updati.patch @@ -0,0 +1,76 @@ +From 18fd66eeba5ebf61048576cc48500611a3e2e5ba Mon Sep 17 00:00:00 2001 +From: Andrew Kenworthy +Date: Tue, 5 May 2026 16:57:20 +0200 +Subject: NIFI-15901 Disable controller services before updating + +--- + ...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++--- + 1 file changed, 37 insertions(+), 7 deletions(-) + +diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +index 2c64fa8cae..028dfbf90b 100644 +--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java ++++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +@@ -711,7 +711,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + } + + private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map servicesByVersionedId, +- final ProcessGroup topLevelGroup) { ++ final ProcessGroup topLevelGroup) throws FlowSynchronizationException { + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each +@@ -745,17 +745,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen + updateControllerService(addedService, proposedService, topLevelGroup); + } + +- // Update all of the Controller Services to match the VersionedControllerService ++ // Update all Controller Services to match the VersionedControllerService. ++ // Services may still be ENABLED here because not all callers disable them before sync-ing. ++ // We must disable before calling updateControllerService, which calls setProperties ++ // which calls verifyModifiable and throws IllegalStateException on ENABLED services. + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { +- updateControllerService(service, proposedService, topLevelGroup); +- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, +- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state +- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); +- LOG.info("Updated {}", service); ++ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); ++ final Set referencesToRestart = new HashSet<>(); ++ final Set servicesToRestart = new HashSet<>(); ++ ++ try { ++ try { ++ stopControllerService(service, proposedService, stopTimeout, ++ syncOptions.getComponentStopTimeoutAction(), ++ referencesToRestart, servicesToRestart, syncOptions); ++ } catch (final TimeoutException e) { ++ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e); ++ } catch (final InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e); ++ } ++ updateControllerService(service, proposedService, topLevelGroup); ++ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service))); ++ LOG.info("Updated {}", service); ++ } finally { ++ // Re-enable services and restart components that were stopped for the update, ++ // restoring the controller to its pre-update running state. ++ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { ++ // Use the component scheduler (not the provider directly) which has ++ // already been paused, avoiding a race with the enable loop below. ++ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart); ++ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); ++ context.getControllerServiceProvider().scheduleReferencingComponents( ++ service, referencesToRestart, context.getComponentScheduler()); ++ referencesToRestart.forEach(componentNode -> ++ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); ++ } ++ } + } + } +