DPDK: 5-tuple-swap multiprocess testpmd on a single port#4241
DPDK: 5-tuple-swap multiprocess testpmd on a single port#4241
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a new DPDK test that validates multiprocess testpmd functionality using a 5-tuple-swap forwarding mode with a single port setup. The test involves two VMs: one VM runs both sender and receiver processes using DPDK multiprocessing, while the other VM acts as a forwarder that swaps MAC and IP addresses to return traffic back to the sender VM.
Changes:
- Introduces multiprocess DPDK support with new enums for process roles and forwarding modes
- Adds
generate_5tswap_run_infofunction to configure the 3-process test topology - Updates command handling throughout to support multiple commands per node (List[str] instead of str)
- Adds new test case
verify_dpdk_testpmd_5tswap_gb_hugepages_netvscto the test suite
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| lisa/microsoft/testsuites/dpdk/common.py | Adds DpdkMpRole and TestpmdForwardMode enums to support multiprocess contexts and different forwarding modes |
| lisa/microsoft/testsuites/dpdk/dpdktestpmd.py | Adds _generate_mp_arguments method and extends generate_testpmd_command to support multiprocess role configuration and custom core lists |
| lisa/microsoft/testsuites/dpdk/dpdkutil.py | Adds generate_5tswap_run_info function for 5-tuple-swap test setup, updates return types to support multiple commands per node, and modifies verify_dpdk_send_receive to handle multiple processes |
| lisa/microsoft/testsuites/dpdk/dpdksuite.py | Adds new test case for 5-tuple-swap multiprocess validation with 1GB hugepages |
8ff8754 to
49b60be
Compare
7b4ad83 to
2951c20
Compare
3cb0b8e to
088eca9
Compare
ab260bc to
d3d36ed
Compare
|
@mcgov it failed in canonical 0001-com-ubuntu-server-jammy 22_04-lts-gen2 latest, please check it is test case issue or image issue. |
d3d36ed to
f00a6ce
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
lisa/microsoft/testsuites/dpdk/dpdkutil.py:662
start_testpmd_concurrentnow accepts multiple commands perDpdkTestResources, but_collect_dict_resultstores results inoutputkeyed only by the kit, so later command outputs will overwrite earlier ones for the same kit. If multi-process output is needed, change the output structure (e.g.,Dict[DpdkTestResources, List[str]]or key by(kit, proc_index)), or explicitly document/rename the function to indicate only the last output per kit is retained.
def _collect_dict_result(result: Tuple[DpdkTestResources, str]) -> None:
output[result[0]] = result[1]
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
lisa/microsoft/testsuites/dpdk/dpdkutil.py:662
start_testpmd_concurrentnow accepts a list of commands per test kit, but_collect_dict_resultstores a single string per kit (output[result[0]] = result[1]). If more than one command is provided for the same kit, later-finishing tasks will overwrite earlier outputs non-deterministically, losing logs/results. Consider changing the output type to store a list per kit (or key by (kit, index)), and update the collector accordingly.
command_pairs_as_tuples: List[Tuple[DpdkTestResources, str]] = []
kits_and_commands = deque(node_cmd_pairs.items())
for kit_and_commands in kits_and_commands:
kit, commands = kit_and_commands
for command in commands:
command_pairs_as_tuples += [(kit, command)]
def _collect_dict_result(result: Tuple[DpdkTestResources, str]) -> None:
output[result[0]] = result[1]
0c007db to
76be3c0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
lisa/microsoft/testsuites/dpdk/dpdkutil.py:669
start_testpmd_concurrentnow supports multiple commands per test kit, but_collect_dict_resultstores results inoutputkeyed only byDpdkTestResources, so later commands overwrite earlier ones. This makes the returnedoutputincomplete/ambiguous for multiprocess runs; consider storing a list of outputs per kit (or key by(kit, proc_id)/ command index) and updating the return type accordingly.
command_pairs_as_tuples: List[Tuple[DpdkTestResources, str]] = []
kits_and_commands = deque(node_cmd_pairs.items())
for kit_and_commands in kits_and_commands:
kit, commands = kit_and_commands
for command in commands:
command_pairs_as_tuples += [(kit, command)]
def _collect_dict_result(result: Tuple[DpdkTestResources, str]) -> None:
output[result[0]] = result[1]
| forwarded_over_received = abs(rcv_tx_pps / rcv_rx_pps) | ||
| assert_that(forwarded_over_received).described_as( | ||
| "receiver re-send pps was unexpectedly low!" | ||
| ).is_close_to(0.8, 0.2) |
There was a problem hiding this comment.
The forwarding ratio check uses hard-coded thresholds is_close_to(0.8, 0.2) without any context for why 80% (±20%) is the expected range. Please document the rationale (e.g., expected overhead, batching behavior) or define named constants so it's clear what behavior/regression this is intended to catch.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
be500a4 to
7473d1c
Compare
| # Use only cores from NUMA node 0 for better memory locality. | ||
| # Reserve core 0 for the OS, use remaining cores split between processes. | ||
| sender_lscpu = sender.node.tools[Lscpu] | ||
| sender_cpu_info = sender_lscpu.get_cpu_info() | ||
| # Get cores on NUMA node 0, excluding core 0 (reserved for OS) | ||
| sender_numa0_cores = sorted( | ||
| [cpu.cpu for cpu in sender_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] | ||
| ) | ||
| # Split cores between primary (txonly) and secondary (rxonly) processes | ||
| # Use even-indexed cores for primary, odd-indexed for secondary | ||
| primary_cores = sender_numa0_cores[::2] # every other core starting at index 0 | ||
| secondary_cores = sender_numa0_cores[1::2] # every other core starting at index 1 | ||
|
|
||
| # similarly for receiver - use NUMA node 0 cores only | ||
| receiver_lscpu = receiver.node.tools[Lscpu] | ||
| receiver_cpu_info = receiver_lscpu.get_cpu_info() | ||
| receiver_available_cores = sorted( | ||
| [cpu.cpu for cpu in receiver_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] | ||
| ) |
There was a problem hiding this comment.
Restricting core selection to NUMA node 0 can easily produce too few cores (e.g., multi-NUMA VMs where cores are split across nodes), which will then trigger the later core_list size assertion inside generate_testpmd_command and fail the test. Consider selecting cores based on the NIC’s NUMA node (if available), and/or falling back to using cores across all NUMA nodes when NUMA0 doesn’t provide enough cores. Also add a clear pre-check here that raises SkippedException with an actionable message when there aren’t enough usable cores after filtering/splitting.
| # Use only cores from NUMA node 0 for better memory locality. | |
| # Reserve core 0 for the OS, use remaining cores split between processes. | |
| sender_lscpu = sender.node.tools[Lscpu] | |
| sender_cpu_info = sender_lscpu.get_cpu_info() | |
| # Get cores on NUMA node 0, excluding core 0 (reserved for OS) | |
| sender_numa0_cores = sorted( | |
| [cpu.cpu for cpu in sender_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] | |
| ) | |
| # Split cores between primary (txonly) and secondary (rxonly) processes | |
| # Use even-indexed cores for primary, odd-indexed for secondary | |
| primary_cores = sender_numa0_cores[::2] # every other core starting at index 0 | |
| secondary_cores = sender_numa0_cores[1::2] # every other core starting at index 1 | |
| # similarly for receiver - use NUMA node 0 cores only | |
| receiver_lscpu = receiver.node.tools[Lscpu] | |
| receiver_cpu_info = receiver_lscpu.get_cpu_info() | |
| receiver_available_cores = sorted( | |
| [cpu.cpu for cpu in receiver_cpu_info if cpu.numa_node == 0 and cpu.cpu != 0] | |
| ) | |
| # Prefer cores on the NIC's NUMA node for locality, but fall back to | |
| # using cores across all NUMA nodes if that does not leave enough usable | |
| # cores after excluding CPU 0 (reserved for the OS). | |
| def _get_nic_numa_node(nic: NicInfo) -> Optional[int]: | |
| nic_numa_node = getattr(nic, "numa_node", None) | |
| return nic_numa_node if isinstance(nic_numa_node, int) else None | |
| def _get_usable_cores( | |
| cpu_info: List[Any], nic: NicInfo, required_core_count: int | |
| ) -> List[int]: | |
| preferred_numa_node = _get_nic_numa_node(nic) | |
| if preferred_numa_node is not None: | |
| preferred_cores = sorted( | |
| [ | |
| cpu.cpu | |
| for cpu in cpu_info | |
| if cpu.numa_node == preferred_numa_node and cpu.cpu != 0 | |
| ] | |
| ) | |
| if len(preferred_cores) >= required_core_count: | |
| return preferred_cores | |
| return sorted([cpu.cpu for cpu in cpu_info if cpu.cpu != 0]) | |
| sender_lscpu = sender.node.tools[Lscpu] | |
| sender_cpu_info = sender_lscpu.get_cpu_info() | |
| sender_available_cores = _get_usable_cores( | |
| sender_cpu_info, snd_nic, required_core_count=2 | |
| ) | |
| # Split cores between primary (txonly) and secondary (rxonly) processes. | |
| # Use even-indexed cores for primary, odd-indexed for secondary. | |
| primary_cores = sender_available_cores[::2] | |
| secondary_cores = sender_available_cores[1::2] | |
| if not primary_cores or not secondary_cores: | |
| raise SkippedException( | |
| "Not enough usable CPU cores for sender multi-process testpmd setup. " | |
| f"Need at least 2 non-zero cores after NUMA filtering/fallback, but " | |
| f"found {len(sender_available_cores)} usable cores " | |
| f"(preferred NIC NUMA node: {_get_nic_numa_node(snd_nic)})." | |
| ) | |
| receiver_lscpu = receiver.node.tools[Lscpu] | |
| receiver_cpu_info = receiver_lscpu.get_cpu_info() | |
| receiver_available_cores = _get_usable_cores( | |
| receiver_cpu_info, rcv_nic, required_core_count=1 | |
| ) | |
| if not receiver_available_cores: | |
| raise SkippedException( | |
| "Not enough usable CPU cores for receiver testpmd setup. " | |
| "Need at least 1 non-zero core after NUMA filtering/fallback, but " | |
| f"found 0 usable cores (preferred NIC NUMA node: " | |
| f"{_get_nic_numa_node(rcv_nic)})." | |
| ) |
| "Ubuntu": { | ||
| "20.4.0": "v25.11", | ||
| "22.4.0": "v24.11", | ||
| "24.4.0": "v24.11", | ||
| "25.4.0": "v25.11", | ||
| "26.4.0": "v25.11", | ||
| }, | ||
| "Debian": { | ||
| "10.0.0": "v22.11", | ||
| "11.0.0": "v24.11", | ||
| "12.0.0": "v24.11", | ||
| "13.0.0": "v25.11", | ||
| }, | ||
| "Redhat": { | ||
| "8.6.0": "v24.11", | ||
| "9.0.0": "v25.11", | ||
| }, | ||
| "CentOs": { | ||
| "8.6.0": "v24.11", | ||
| "9.0.0": "v25.11", | ||
| }, | ||
| } | ||
|
|
||
|
|
||
| def get_dpdk_default_source_version(node: Node) -> str: | ||
| # match major.minor os versions for supported distros | ||
| # to lkg dpdk versions for the source installation. | ||
| # Versions are evaluated at >= for the os version. | ||
|
|
||
| os_version = node.os.information.version | ||
| os_match = _dpdk_default_source_dict.get(node.os.name, None) |
There was a problem hiding this comment.
Keying the mapping by node.os.name strings is brittle and can silently skip tests if the OS name doesn’t match these exact spellings/casing (e.g., 'CentOs' vs common variants like 'CentOS', 'RHEL', etc.). Consider keying by OS classes/types (isinstance checks), or centralizing canonical OS-name constants used across the codebase. At minimum, add a small normalization layer (e.g., lowercasing and known aliases) to make the mapping resilient.
| "Ubuntu": { | |
| "20.4.0": "v25.11", | |
| "22.4.0": "v24.11", | |
| "24.4.0": "v24.11", | |
| "25.4.0": "v25.11", | |
| "26.4.0": "v25.11", | |
| }, | |
| "Debian": { | |
| "10.0.0": "v22.11", | |
| "11.0.0": "v24.11", | |
| "12.0.0": "v24.11", | |
| "13.0.0": "v25.11", | |
| }, | |
| "Redhat": { | |
| "8.6.0": "v24.11", | |
| "9.0.0": "v25.11", | |
| }, | |
| "CentOs": { | |
| "8.6.0": "v24.11", | |
| "9.0.0": "v25.11", | |
| }, | |
| } | |
| def get_dpdk_default_source_version(node: Node) -> str: | |
| # match major.minor os versions for supported distros | |
| # to lkg dpdk versions for the source installation. | |
| # Versions are evaluated at >= for the os version. | |
| os_version = node.os.information.version | |
| os_match = _dpdk_default_source_dict.get(node.os.name, None) | |
| "ubuntu": { | |
| "20.4.0": "v25.11", | |
| "22.4.0": "v24.11", | |
| "24.4.0": "v24.11", | |
| "25.4.0": "v25.11", | |
| "26.4.0": "v25.11", | |
| }, | |
| "debian": { | |
| "10.0.0": "v22.11", | |
| "11.0.0": "v24.11", | |
| "12.0.0": "v24.11", | |
| "13.0.0": "v25.11", | |
| }, | |
| "redhat": { | |
| "8.6.0": "v24.11", | |
| "9.0.0": "v25.11", | |
| }, | |
| "centos": { | |
| "8.6.0": "v24.11", | |
| "9.0.0": "v25.11", | |
| }, | |
| } | |
| def _normalize_dpdk_os_name(os_name: str) -> str: | |
| normalized_name = os_name.strip().lower() | |
| return { | |
| "rhel": "redhat", | |
| "red hat": "redhat", | |
| "red hat enterprise linux": "redhat", | |
| "redhatenterpriselinux": "redhat", | |
| "centos": "centos", | |
| "centos linux": "centos", | |
| }.get(normalized_name, normalized_name) | |
| def get_dpdk_default_source_version(node: Node) -> str: | |
| # match major.minor os versions for supported distros | |
| # to lkg dpdk versions for the source installation. | |
| # Versions are evaluated at >= for the os version. | |
| os_version = node.os.information.version | |
| normalized_os_name = _normalize_dpdk_os_name(node.os.name) | |
| os_match = _dpdk_default_source_dict.get(normalized_os_name, None) |
| for version_threshold, dpdk_version in os_match.items(): | ||
| if ( | ||
| os_version >= version_threshold | ||
| and os_version.major == parse_version(version_threshold).major | ||
| ): | ||
| return dpdk_version | ||
| # if we get here, the os version is too old to have a supported dpdk version |
There was a problem hiding this comment.
This selection depends on dict insertion order to pick the intended threshold. If the dict order changes (or is refactored), an OS version could match an unintended threshold within the same major version. Consider iterating thresholds in a deterministic sorted order (by parsed version) so the chosen DPDK version is stable and clearly correct (e.g., select the highest threshold <= os_version within the same major).
| for version_threshold, dpdk_version in os_match.items(): | |
| if ( | |
| os_version >= version_threshold | |
| and os_version.major == parse_version(version_threshold).major | |
| ): | |
| return dpdk_version | |
| # if we get here, the os version is too old to have a supported dpdk version | |
| sorted_thresholds = sorted( | |
| os_match.items(), | |
| key=lambda item: parse_version(item[0]), | |
| reverse=True, | |
| ) | |
| for version_threshold, dpdk_version in sorted_thresholds: | |
| parsed_threshold = parse_version(version_threshold) | |
| if ( | |
| os_version >= parsed_threshold | |
| and os_version.major == parsed_threshold.major | |
| ): | |
| return dpdk_version | |
| # if we get here, the os version is too old to have a supported dpdk version |
| for tx_secondary in [x.wait_result() for x in sender_processes[1:]]: | ||
| log.debug(f"\nSENDER_SECONDARY:\n{tx_secondary.stdout}") |
There was a problem hiding this comment.
The secondary sender processes’ results are waited/consumed in _collect_testpmd_results and then waited again in _validate_5tswap_results. Depending on Process.wait_result semantics, this can be redundant at best and error-prone at worst. Additionally, parsing the secondary output via sender.testpmd.process_testpmd_output mutates the sender testpmd object’s internal parsed metrics (potentially overwriting values from the primary txonly run). Consider collecting and returning the ProcessResult objects once (including secondary outputs) and passing them into the 5tswap validation, and/or parsing secondary stats into a separate structure so validation doesn’t overwrite the primary sender metrics.
| # Verify sender secondary process received forwarded packets | ||
| tx_secondary_results = [x.wait_result() for x in sender_processes[1:]] |
There was a problem hiding this comment.
The secondary sender processes’ results are waited/consumed in _collect_testpmd_results and then waited again in _validate_5tswap_results. Depending on Process.wait_result semantics, this can be redundant at best and error-prone at worst. Additionally, parsing the secondary output via sender.testpmd.process_testpmd_output mutates the sender testpmd object’s internal parsed metrics (potentially overwriting values from the primary txonly run). Consider collecting and returning the ProcessResult objects once (including secondary outputs) and passing them into the 5tswap validation, and/or parsing secondary stats into a separate structure so validation doesn’t overwrite the primary sender metrics.
Adds a simpler forwarding test using 2 VMs and 1 port. This test uses multiprocess DPDK to create a sender and receiver process on one VM, and a forwarder process on the other. The forwarder sends all traffic back to the first VM by swapping the mac and IP addresses.