Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,17 +454,6 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p
// parameter contexts that inherit from one another and neither the inheriting nor inherited parameter context exists.
if (versionedParameterContexts != null) {
versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);

// After ensuring all contexts exist, add any missing parameters to all existing contexts from the proposed definitions.
// This is necessary because createParameterContextWithoutReferences skips contexts that already exist, so new parameters
// added to inherited contexts (e.g., a parent P2 inherited by the group's bound context P1) would otherwise be missed.
final ComponentIdGenerator componentIdGenerator = context.getComponentIdGenerator();
for (final Map.Entry<String, VersionedParameterContext> entry : versionedParameterContexts.entrySet()) {
final ParameterContext existingContext = getParameterContextByName(entry.getKey());
if (existingContext != null) {
addMissingConfiguration(entry.getValue(), existingContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator);
}
}
}

updateParameterContext(group, proposed, versionedParameterContexts, parameterProviderReferences, context.getComponentIdGenerator());
Expand Down Expand Up @@ -2404,6 +2393,17 @@ private void addMissingConfiguration(final VersionedParameterContext versionedPa
final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences,
final ComponentIdGenerator componentIdGenerator) {
addMissingConfiguration(versionedParameterContext, currentParameterContext, versionedParameterContexts, parameterProviderReferences, componentIdGenerator, new HashSet<>());
}

private void addMissingConfiguration(final VersionedParameterContext versionedParameterContext, final ParameterContext currentParameterContext,
final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences,
final ComponentIdGenerator componentIdGenerator, final Set<String> visitedParameterContextIds) {
if (!visitedParameterContextIds.add(currentParameterContext.getIdentifier())) {
return;
}

final Map<String, Parameter> parameters = new HashMap<>();
for (final VersionedParameter versionedParameter : versionedParameterContext.getParameters()) {
final Optional<Parameter> parameterOption = currentParameterContext.getParameter(versionedParameter.getName());
Expand All @@ -2429,13 +2429,38 @@ private void addMissingConfiguration(final VersionedParameterContext versionedPa
currentParameterContext.setDescription(versionedParameterContext.getDescription());
}

// If the current parameter context doesn't have any inherited param contexts but the versioned one does,
// add the versioned ones.
if (currentParameterContext.getInheritedParameterContexts().isEmpty()
&& versionedParameterContext.getInheritedParameterContexts() != null && !versionedParameterContext.getInheritedParameterContexts().isEmpty()) {
currentParameterContext.setInheritedParameterContexts(versionedParameterContext.getInheritedParameterContexts().stream()
.map(name -> selectParameterContext(versionedParameterContexts.get(name), versionedParameterContexts, parameterProviderReferences, componentIdGenerator))
.collect(Collectors.toList()));
final List<String> proposedInheritedNames = versionedParameterContext.getInheritedParameterContexts();
final List<ParameterContext> currentInheritedContexts = currentParameterContext.getInheritedParameterContexts();
if (proposedInheritedNames != null && !proposedInheritedNames.isEmpty()) {
if (currentInheritedContexts.isEmpty()) {
// The local parameter context has no inheritance configured yet, so adopt the versioned chain
// by selecting (or creating) a matching parameter context for each inherited name.
currentParameterContext.setInheritedParameterContexts(proposedInheritedNames.stream()
.map(name -> selectParameterContext(versionedParameterContexts.get(name), versionedParameterContexts, parameterProviderReferences, componentIdGenerator))
.collect(Collectors.toList()));
} else {
// Walk the local inheritance chain in lockstep with the versioned chain so updates to inherited
// contexts are applied to the contexts actually referenced by this parameter context, even when
// the local names were suffix-renamed at import time (for example, P (2) instead of P). Pairs that
// do not match by exact name or by name-with-suffix are skipped to avoid corrupting a chain that
// was rewired locally.
final int matchedDepth = Math.min(currentInheritedContexts.size(), proposedInheritedNames.size());
for (int i = 0; i < matchedDepth; i++) {
final ParameterContext liveInheritedContext = currentInheritedContexts.get(i);
final String proposedInheritedName = proposedInheritedNames.get(i);
final VersionedParameterContext proposedInheritedContext = versionedParameterContexts == null ? null : versionedParameterContexts.get(proposedInheritedName);
if (liveInheritedContext == null || proposedInheritedContext == null) {
continue;
}
final String liveInheritedName = liveInheritedContext.getName();
if (!liveInheritedName.equals(proposedInheritedName)
&& !ParameterContextNameUtils.isNameWithSuffix(liveInheritedName, proposedInheritedName)) {
continue;
}
addMissingConfiguration(proposedInheritedContext, liveInheritedContext, versionedParameterContexts, parameterProviderReferences,
componentIdGenerator, visitedParameterContextIds);
}
}
}

if (versionedParameterContext.getParameterProvider() != null && currentParameterContext.getParameterProvider() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,86 @@ void testNewParameterInInheritedContextSynchronizedDuringUpgrade() throws NiFiCl
assertTrue(p2NamesAfterUpgrade.contains("paramX"), "paramX should exist on P2 after upgrading to version 2");
}

/**
* Verifies that a new parameter introduced by a new flow version is applied only to the parameter context
* actually bound to the upgraded process group, even when the local flow has multiple deployments of the
* same versioned flow with REPLACE-strategy suffix-renamed parameter contexts (P, P (1), P (2)).
*
* Scenario: Flow F has parameter context P. F is imported three times with the REPLACE strategy, producing
* deployments bound to P, P (1), and P (2) respectively. Version 2 of F adds parameter Z to P. Upgrading
* the third deployment must apply Z only to P (2); P and P (1) must remain unchanged.
*/
@Test
void testNewParameterAppliedOnlyToBoundSuffixedContextDuringUpgrade() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();
final NiFiClientUtil util = getClientUtil();

final ParameterContextEntity sourceParamContextP = util.createParameterContext(PARAMETER_CONTEXT_NAME, Map.of(PARAMETER_NAME, PARAMETER_VALUE));
final ProcessGroupEntity sourceGroupA = util.createProcessGroup(GROUP_A_NAME, "root");
util.setParameterContext(sourceGroupA.getId(), sourceParamContextP);

final ProcessorEntity processor = util.createProcessor(PROCESSOR_TYPE, sourceGroupA.getId());
util.updateProcessorProperties(processor, Collections.singletonMap(PROCESSOR_PROPERTY_TEXT, PARAMETER_REFERENCE));
util.setAutoTerminatedRelationships(processor, RELATIONSHIP_SUCCESS);

final VersionControlInformationEntity vciV1 = util.startVersionControl(sourceGroupA, clientEntity, TEST_FLOWS_BUCKET, FLOW_NAME);
final String flowId = vciV1.getVersionControlInformation().getFlowId();

final String paramZName = "paramZ";
final String paramZValue = "valueZ";
final ParameterContextEntity currentSourceP = getNifiClient().getParamContextClient().getParamContext(sourceParamContextP.getId(), false);
final ParameterContextUpdateRequestEntity sourceUpdate = util.updateParameterContext(currentSourceP,
Map.of(PARAMETER_NAME, PARAMETER_VALUE, paramZName, paramZValue));
util.waitForParameterContextRequestToComplete(sourceParamContextP.getId(), sourceUpdate.getRequest().getRequestId());

final ProcessGroupEntity sourceGroupARefreshed = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId());
util.saveFlowVersion(sourceGroupARefreshed, clientEntity, vciV1);

final ProcessGroupEntity sourceForStopVc = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId());
getNifiClient().getVersionsClient().stopVersionControl(sourceForStopVc);
util.deleteAll(sourceGroupA.getId());
final ProcessGroupEntity sourceToDelete = getNifiClient().getProcessGroupClient().getProcessGroup(sourceGroupA.getId());
getNifiClient().getProcessGroupClient().deleteProcessGroup(sourceToDelete);

final ParameterContextEntity sourceContextToDelete = getNifiClient().getParamContextClient().getParamContext(sourceParamContextP.getId(), false);
getNifiClient().getParamContextClient().deleteParamContext(sourceParamContextP.getId(),
String.valueOf(sourceContextToDelete.getRevision().getVersion()));

final ProcessGroupEntity importedA1 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1);
final String paramContextId1 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA1.getId())
.getComponent().getParameterContext().getId();

final ProcessGroupEntity importedA2 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1);
final String paramContextId2 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA2.getId())
.getComponent().getParameterContext().getId();

final ProcessGroupEntity importedA3 = importFlowWithReplaceParameterContext(clientEntity.getId(), flowId, VERSION_1);
final String paramContextId3 = getNifiClient().getProcessGroupClient().getProcessGroup(importedA3.getId())
.getComponent().getParameterContext().getId();

assertNotEquals(paramContextId1, paramContextId2);
assertNotEquals(paramContextId2, paramContextId3);
assertNotEquals(paramContextId1, paramContextId3);

assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId1, false)).contains(paramZName));
assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId2, false)).contains(paramZName));
assertFalse(getParameterNames(getNifiClient().getParamContextClient().getParamContext(paramContextId3, false)).contains(paramZName));

util.changeFlowVersion(importedA3.getId(), VERSION_2);

final ParameterContextEntity context3AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId3, false);
assertTrue(getParameterNames(context3AfterUpgrade).contains(paramZName),
"paramZ should be added to the parameter context bound to the upgraded deployment");

final ParameterContextEntity context1AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId1, false);
assertFalse(getParameterNames(context1AfterUpgrade).contains(paramZName),
"paramZ should not leak into the canonical parameter context bound to a different deployment");
Comment thread
pvillard31 marked this conversation as resolved.

final ParameterContextEntity context2AfterUpgrade = getNifiClient().getParamContextClient().getParamContext(paramContextId2, false);
assertFalse(getParameterNames(context2AfterUpgrade).contains(paramZName),
"paramZ should not leak into the suffixed parameter context bound to a different deployment");
}

/**
* Verifies that parameter and parameter context descriptions are updated when upgrading a versioned
* process group from one version to the next, even when the parameter value itself remains unchanged
Expand Down
Loading