From 9d62357d5954874261a9c000230f186fb8966a72 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 15 May 2026 18:58:14 +0200 Subject: [PATCH 1/2] NIFI-15945 - Parameter not correctly added to parameter context when having multiple suffixed parameter contexts --- ...tandardVersionedComponentSynchronizer.java | 61 +++++++++---- .../ParameterContextPreservationIT.java | 89 +++++++++++++++++++ 2 files changed, 132 insertions(+), 18 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index e3704ead9ade..d5f1db9d7bee 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -454,17 +454,6 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p // parameter contexts that inherit from one another and neither the inheriting nor inherited parameter context exists. if (versionedParameterContexts != null) { versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences); - - // After ensuring all contexts exist, add any missing parameters to all existing contexts from the proposed definitions. - // This is necessary because createParameterContextWithoutReferences skips contexts that already exist, so new parameters - // added to inherited contexts (e.g., a parent P2 inherited by the group's bound context P1) would otherwise be missed. - final ComponentIdGenerator componentIdGenerator = context.getComponentIdGenerator(); - for (final Map.Entry entry : versionedParameterContexts.entrySet()) { - final ParameterContext existingContext = getParameterContextByName(entry.getKey()); - if (existingContext != null) { - addMissingConfiguration(entry.getValue(), existingContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator); - } - } } updateParameterContext(group, proposed, versionedParameterContexts, parameterProviderReferences, context.getComponentIdGenerator()); @@ -2404,6 +2393,17 @@ private void addMissingConfiguration(final VersionedParameterContext versionedPa final Map versionedParameterContexts, final Map parameterProviderReferences, final ComponentIdGenerator componentIdGenerator) { + addMissingConfiguration(versionedParameterContext, currentParameterContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator, new HashSet<>()); + } + + private void addMissingConfiguration(final VersionedParameterContext versionedParameterContext, final ParameterContext currentParameterContext, + final Map versionedParameterContexts, + final Map parameterProviderReferences, + final ComponentIdGenerator componentIdGenerator, final Set visitedParameterContextIds) { + if (!visitedParameterContextIds.add(currentParameterContext.getIdentifier())) { + return; + } + final Map parameters = new HashMap<>(); for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) { final Optional parameterOption = currentParameterContext.getParameter(versionedParameter.getName()); @@ -2429,13 +2429,38 @@ private void addMissingConfiguration(final VersionedParameterContext versionedPa currentParameterContext.setDescription(versionedParameterContext.getDescription()); } - // If the current parameter context doesn't have any inherited param contexts but the versioned one does, - // add the versioned ones. - if (currentParameterContext.getInheritedParameterContexts().isEmpty() - && versionedParameterContext.getInheritedParameterContexts() != null && !versionedParameterContext.getInheritedParameterContexts().isEmpty()) { - currentParameterContext.setInheritedParameterContexts(versionedParameterContext.getInheritedParameterContexts().stream() - .map(name -> selectParameterContext(versionedParameterContexts.get(name), versionedParameterContexts, parameterProviderReferences, componentIdGenerator)) - .collect(Collectors.toList())); + final List proposedInheritedNames = versionedParameterContext.getInheritedParameterContexts(); + final List currentInheritedContexts = currentParameterContext.getInheritedParameterContexts(); + if (proposedInheritedNames != null && !proposedInheritedNames.isEmpty()) { + if (currentInheritedContexts.isEmpty()) { + // The local parameter context has no inheritance configured yet, so adopt the versioned chain + // by selecting (or creating) a matching parameter context for each inherited name. + currentParameterContext.setInheritedParameterContexts(proposedInheritedNames.stream() + .map(name -> selectParameterContext(versionedParameterContexts.get(name), versionedParameterContexts, parameterProviderReferences, componentIdGenerator)) + .collect(Collectors.toList())); + } else { + // Walk the local inheritance chain in lockstep with the versioned chain so updates to inherited + // contexts are applied to the contexts actually referenced by this parameter context, even when + // the local names were suffix-renamed at import time (for example, P (2) instead of P). Pairs that + // do not match by exact name or by name-with-suffix are skipped to avoid corrupting a chain that + // was rewired locally. + final int matchedDepth = Math.min(currentInheritedContexts.size(), proposedInheritedNames.size()); + for (int i = 0; i < matchedDepth; i++) { + final ParameterContext liveInheritedContext = currentInheritedContexts.get(i); + final String proposedInheritedName = proposedInheritedNames.get(i); + final VersionedParameterContext proposedInheritedContext = versionedParameterContexts == null ? null : versionedParameterContexts.get(proposedInheritedName); + if (liveInheritedContext == null || proposedInheritedContext == null) { + continue; + } + final String liveInheritedName = liveInheritedContext.getName(); + if (!liveInheritedName.equals(proposedInheritedName) + && !ParameterContextNameUtils.isNameWithSuffix(liveInheritedName, proposedInheritedName)) { + continue; + } + addMissingConfiguration(proposedInheritedContext, liveInheritedContext, versionedParameterContexts, parameterProviderReferences, + componentIdGenerator, visitedParameterContextIds); + } + } } if (versionedParameterContext.getParameterProvider() != null && currentParameterContext.getParameterProvider() == null) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java index c9e26318a5ec..0c154cae9ea3 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java @@ -245,6 +245,95 @@ void testNewParameterInInheritedContextSynchronizedDuringUpgrade() throws NiFiCl assertTrue(p2NamesAfterUpgrade.contains("paramX"), "paramX should exist on P2 after upgrading to version 2"); } + /** + * Verifies that a new parameter introduced by a new flow version is applied only to the parameter context + * actually bound to the upgraded process group, even when the local flow has multiple deployments of the + * same versioned flow with REPLACE-strategy suffix-renamed parameter contexts (P, P (1), P (2)). + * + * Scenario: Flow F has parameter context P. F is imported three times with the REPLACE strategy, producing + * deployments bound to P, P (1), and P (2) respectively. Version 2 of F adds parameter Z to P. Upgrading + * the third deployment must apply Z only to P (2); P and P (1) must remain unchanged. + */ + @Test + void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + // Build the source flow A bound to parameter context P, save as version 1 + final ParameterContextEntity sourceParamContextP = util.createParameterContext(PARAMETER_CONTEXT_NAME, Map.of(PARAMETER_NAME, PARAMETER_VALUE)); + final ProcessGroupEntity sourceGroupA = util.createProcessGroup(GROUP_A_NAME, "root"); + util.setParameterContext(sourceGroupA.getId(), sourceParamContextP); + + final ProcessorEntity processor = util.createProcessor(PROCESSOR_TYPE, sourceGroupA.getId()); + util.updateProcessorProperties(processor, Collections.singletonMap(PROCESSOR_PROPERTY_TEXT, PARAMETER_REFERENCE)); + util.setAutoTerminatedRelationships(processor, RELATIONSHIP_SUCCESS); + + final VersionControlInformationEntity vciV1 = util.startVersionControl(sourceGroupA, clientEntity, TEST_FLOWS_BUCKET, FLOW_NAME); + final String flowId = vciV1.getVersionControlInformation().getFlowId(); + + // Add new parameter paramZ to P and save as version 2 + final String paramZName = "paramZ"; + final String paramZValue = "valueZ"; + final ParameterContextEntity currentSourceP = getNifiClient().getParamContextClient().getParamContext(sourceParamContextP.getId(), false); + final ParameterContextUpdateRequestEntity sourceUpdate = util.updateParameterContext(currentSourceP, + Map.of(PARAMETER_NAME, PARAMETER_VALUE, paramZName, paramZValue)); + util.waitForParameterContextRequestToComplete(sourceParamContextP.getId(), sourceUpdate.getRequest().getRequestId()); + + final ProcessGroupEntity sourceGroupARefreshed = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId()); + util.saveFlowVersion(sourceGroupARefreshed, clientEntity, vciV1); + + // Clean up the source flow and parameter context so that subsequent imports start from a clean slate + final ProcessGroupEntity sourceForStopVc = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId()); + getNifiClient().getVersionsClient().stopVersionControl(sourceForStopVc); + util.deleteAll(sourceGroupA.getId()); + final ProcessGroupEntity sourceToDelete = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId()); + getNifiClient().getProcessGroupClient().deleteProcessGroup(sourceToDelete); + + final ParameterContextEntity sourceContextToDelete = getNifiClient().getParamContextClient().getParamContext(sourceParamContextP.getId(), false); + getNifiClient().getParamContextClient().deleteParamContext(sourceParamContextP.getId(), + String.valueOf(sourceContextToDelete.getRevision().getVersion())); + + // Import version 1 three times. With the REPLACE strategy, each import creates a new parameter context: + // A1 -> P, A2 -> P (1), A3 -> P (2). + final ProcessGroupEntity importedA1 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1); + final String paramContextId1 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA1.getId()) + .getComponent().getParameterContext().getId(); + + final ProcessGroupEntity importedA2 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1); + final String paramContextId2 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA2.getId()) + .getComponent().getParameterContext().getId(); + + final ProcessGroupEntity importedA3 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1); + final String paramContextId3 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA3.getId()) + .getComponent().getParameterContext().getId(); + + assertNotEquals(paramContextId1, paramContextId2); + assertNotEquals(paramContextId2, paramContextId3); + assertNotEquals(paramContextId1, paramContextId3); + + // None of the imported parameter contexts should have paramZ after importing version 1 + assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId1, false)).contains(paramZName)); + assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId2, false)).contains(paramZName)); + assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId3, false)).contains(paramZName)); + + // Upgrade only the third deployment (A3, bound to P (2)) to version 2 + util.changeFlowVersion(importedA3.getId(), VERSION_2); + + // The parameter context bound to A3 must contain the new parameter + final ParameterContextEntity context3AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId3, false); + assertTrue(getParameterNames(context3AfterUpgrade).contains(paramZName), + "paramZ should be added to the parameter context bound to the upgraded deployment"); + + // The parameter contexts bound to the other deployments must remain unchanged + final ParameterContextEntity context1AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId1, false); + assertFalse(getParameterNames(context1AfterUpgrade).contains(paramZName), + "paramZ should not leak into the canonical parameter context bound to a different deployment"); + + final ParameterContextEntity context2AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId2, false); + assertFalse(getParameterNames(context2AfterUpgrade).contains(paramZName), + "paramZ should not leak into the suffixed parameter context bound to a different deployment"); + } + /** * Verifies that parameter and parameter context descriptions are updated when upgrading a versioned * process group from one version to the next, even when the parameter value itself remains unchanged From 2e7e61c6d113088fb14cbd66c6ef41d26e9f3899 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sat, 16 May 2026 20:46:58 +0200 Subject: [PATCH 2/2] review --- .../system/registry/ParameterContextPreservationIT.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java index 0c154cae9ea3..803180215019 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ParameterContextPreservationIT.java @@ -259,7 +259,6 @@ void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiF final FlowRegistryClientEntity clientEntity = registerClient(); final NiFiClientUtil util = getClientUtil(); - // Build the source flow A bound to parameter context P, save as version 1 final ParameterContextEntity sourceParamContextP = util.createParameterContext(PARAMETER_CONTEXT_NAME, Map.of(PARAMETER_NAME, PARAMETER_VALUE)); final ProcessGroupEntity sourceGroupA = util.createProcessGroup(GROUP_A_NAME, "root"); util.setParameterContext(sourceGroupA.getId(), sourceParamContextP); @@ -271,7 +270,6 @@ void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiF final VersionControlInformationEntity vciV1 = util.startVersionControl(sourceGroupA, clientEntity, TEST_FLOWS_BUCKET, FLOW_NAME); final String flowId = vciV1.getVersionControlInformation().getFlowId(); - // Add new parameter paramZ to P and save as version 2 final String paramZName = "paramZ"; final String paramZValue = "valueZ"; final ParameterContextEntity currentSourceP = getNifiClient().getParamContextClient().getParamContext(sourceParamContextP.getId(), false); @@ -282,7 +280,6 @@ void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiF final ProcessGroupEntity sourceGroupARefreshed = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId()); util.saveFlowVersion(sourceGroupARefreshed, clientEntity, vciV1); - // Clean up the source flow and parameter context so that subsequent imports start from a clean slate final ProcessGroupEntity sourceForStopVc = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId()); getNifiClient().getVersionsClient().stopVersionControl(sourceForStopVc); util.deleteAll(sourceGroupA.getId()); @@ -293,8 +290,6 @@ void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiF getNifiClient().getParamContextClient().deleteParamContext(sourceParamContextP.getId(), String.valueOf(sourceContextToDelete.getRevision().getVersion())); - // Import version 1 three times. With the REPLACE strategy, each import creates a new parameter context: - // A1 -> P, A2 -> P (1), A3 -> P (2). final ProcessGroupEntity importedA1 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1); final String paramContextId1 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA1.getId()) .getComponent().getParameterContext().getId(); @@ -311,20 +306,16 @@ void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiF assertNotEquals(paramContextId2, paramContextId3); assertNotEquals(paramContextId1, paramContextId3); - // None of the imported parameter contexts should have paramZ after importing version 1 assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId1, false)).contains(paramZName)); assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId2, false)).contains(paramZName)); assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId3, false)).contains(paramZName)); - // Upgrade only the third deployment (A3, bound to P (2)) to version 2 util.changeFlowVersion(importedA3.getId(), VERSION_2); - // The parameter context bound to A3 must contain the new parameter final ParameterContextEntity context3AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId3, false); assertTrue(getParameterNames(context3AfterUpgrade).contains(paramZName), "paramZ should be added to the parameter context bound to the upgraded deployment"); - // The parameter contexts bound to the other deployments must remain unchanged final ParameterContextEntity context1AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId1, false); assertFalse(getParameterNames(context1AfterUpgrade).contains(paramZName), "paramZ should not leak into the canonical parameter context bound to a different deployment");