Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
run: |
set -e
if git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.pull_request.head.sha }} \
| grep -qE '*\.(cpp|hpp)$'; then
| grep -qE '.*\.(cpp|hpp)$'; then
echo "cpp_changed=true" >> $GITHUB_OUTPUT
else
echo "cpp_changed=false" >> $GITHUB_OUTPUT
Expand Down Expand Up @@ -79,7 +79,20 @@ jobs:
source /opt/ros/${ROS_DISTRO}/setup.bash
colcon build --parallel-workers 1 --merge-install --cmake-args -DCMAKE_EXPORT_COMPILE_COMMANDS=1

# TODO: run tests
- name: Test
if: steps.check_diff_cpp.outputs.cpp_changed == 'true'
run: |
Comment thread
atsushi421 marked this conversation as resolved.
source /opt/ros/${ROS_DISTRO}/setup.bash
source install/setup.bash
# Run only the functional launch test. Code style is enforced separately
# by the run-pre-commit workflow (clang-format), so the ament linters are
# excluded here via the launch_test label.
colcon test \
--packages-select callback_isolated_executor \
--merge-install \
--event-handlers console_direct+ \
--ctest-args -L launch_test
colcon test-result --verbose

# Summary job that provides a single status check for branch protection rules.
# Matrix jobs report status as "build-and-test-matrix (<distro>)", but branch protection
Expand Down
14 changes: 14 additions & 0 deletions callback_isolated_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,18 @@ ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)
ament_export_include_directories(include)
ament_export_dependencies(cie_thread_configurator cie_config_msgs)

if(BUILD_TESTING)
find_package(launch_testing_ament_cmake REQUIRED)

add_executable(cie_pubsub_test_node test/cie_pubsub_test_node.cpp)
target_link_libraries(cie_pubsub_test_node callback_isolated_executor)
ament_target_dependencies(cie_pubsub_test_node rclcpp std_msgs)

install(TARGETS cie_pubsub_test_node
RUNTIME DESTINATION lib/${PROJECT_NAME}
)

add_launch_test(test/test_cie_pubsub.launch.py)
endif()

ament_package()
3 changes: 3 additions & 0 deletions callback_isolated_executor/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>rclpy</test_depend>

<depend>rclcpp</depend>
<depend>rclcpp_components</depend>
Expand Down
71 changes: 71 additions & 0 deletions callback_isolated_executor/test/cie_pubsub_test_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <chrono>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

#include "callback_isolated_executor/callback_isolated_executor.hpp"

using namespace std::chrono_literals;

class TalkerNode : public rclcpp::Node {
public:
TalkerNode() : Node("cie_test_talker"), count_(0) {
publisher_ = create_publisher<std_msgs::msg::Int32>("chatter", 10);
timer_group_ =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
timer_ = create_wall_timer(
100ms,
[this]() {
auto message = std_msgs::msg::Int32();
message.data = count_++;
publisher_->publish(message);
},
timer_group_);
}

private:
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::CallbackGroup::SharedPtr timer_group_;
int32_t count_;
};

class ListenerNode : public rclcpp::Node {
public:
ListenerNode() : Node("cie_test_listener") {
echo_publisher_ = create_publisher<std_msgs::msg::Int32>("echo", 10);
sub_group_ =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = sub_group_;
subscription_ = create_subscription<std_msgs::msg::Int32>(
"chatter", 10,
[this](const std_msgs::msg::Int32::SharedPtr msg) {
echo_publisher_->publish(*msg);
},
sub_options);
}

private:
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr subscription_;
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr echo_publisher_;
rclcpp::CallbackGroup::SharedPtr sub_group_;
};

int main(int argc, char *argv[]) {
// rclcpp::init() must precede constructing CallbackIsolatedExecutor: its
// constructor creates an internal publisher node.
rclcpp::init(argc, argv);
{
auto talker = std::make_shared<TalkerNode>();
auto listener = std::make_shared<ListenerNode>();
auto executor = std::make_shared<CallbackIsolatedExecutor>();
executor->add_node(talker);
executor->add_node(listener);
// Blocks until SIGINT (launch_testing teardown) flips rclcpp::ok() false.
executor->spin();
}
rclcpp::shutdown();
return 0;
}
119 changes: 119 additions & 0 deletions callback_isolated_executor/test/test_cie_pubsub.launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import unittest

import launch
import launch_ros.actions
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import pytest
import rclpy
from cie_config_msgs.msg import CallbackGroupInfo
from rclpy.node import Node
from rclpy.qos import (
QoSDurabilityPolicy,
QoSHistoryPolicy,
QoSProfile,
QoSReliabilityPolicy,
)
from std_msgs.msg import Int32


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
cie_pubsub_node = launch_ros.actions.Node(
package="callback_isolated_executor",
executable="cie_pubsub_test_node",
output="screen",
)
Comment thread
Copilot marked this conversation as resolved.
return (
launch.LaunchDescription(
[cie_pubsub_node, launch_testing.actions.ReadyToTest()]
),
{"cie_pubsub_node": cie_pubsub_node},
)


class TestCiePubSub(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def setUp(self):
self.node = Node("cie_pubsub_test_observer")

def tearDown(self):
self.node.destroy_node()

def _spin_until(self, predicate, timeout_sec):
end_time = self.node.get_clock().now().nanoseconds + int(
timeout_sec * 1_000_000_000
)
while (
not predicate()
and self.node.get_clock().now().nanoseconds < end_time
):
rclpy.spin_once(self.node, timeout_sec=0.1)

def test_echo_messages_flow(self):
received = []
self.node.create_subscription(
Int32, "echo", lambda msg: received.append(msg.data), 10
)

self._spin_until(lambda: len(received) >= 5, timeout_sec=15.0)

self.assertGreaterEqual(
len(received),
5,
f"Expected >= 5 echo messages, got {len(received)}: {received}",
)
# Values come from an incrementing counter; the observed subsequence must
# be strictly increasing. We do not require start==0 because discovery
# latency may drop the first few messages.
for earlier, later in zip(received, received[1:]):
self.assertLess(earlier, later)

def test_callback_groups_isolated(self):
# CIE publishes one (callback_group_id, thread_id) per callback group on a
# transient_local topic, so a late-joining subscriber still receives them.
# Distinct thread ids prove the callbacks are isolated onto separate threads.
infos = []
qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.KEEP_ALL,
)
self.node.create_subscription(
CallbackGroupInfo,
"/cie_thread_configurator/callback_group_info",
lambda msg: infos.append((msg.callback_group_id, msg.thread_id)),
qos,
)

self._spin_until(lambda: len(infos) >= 2, timeout_sec=15.0)

self.assertGreaterEqual(
len(infos),
2,
f"Expected >= 2 callback group infos, got {infos}",
)
thread_ids = {thread_id for _, thread_id in infos}
self.assertGreaterEqual(
len(thread_ids),
2,
f"Expected callbacks isolated across >= 2 threads, got {infos}",
)


@launch_testing.post_shutdown_test()
class TestCiePubSubShutdown(unittest.TestCase):
def test_exit_code(self, proc_info, cie_pubsub_node):
# 0 = clean return after spin(); -2 = terminated by SIGINT (distro-dependent).
launch_testing.asserts.assertExitCodes(
proc_info, allowable_exit_codes=[0, -2], process=cie_pubsub_node
)
Loading