Skip to content

Commit 8a2b2fd

Browse files
authored
Add option to prevent auto-termination after MCAP playback (#31)
* Fix parameter passing error * run.default with params now works written in README.md * Add ignore_playback_finish options * To meet our requirements, controlling shutdown condition in `Run` class is needed * Add ignore_playback_finish in README * Add test for replay finish condition * run pre-commit
1 parent 5dafad7 commit 8a2b2fd

File tree

5 files changed

+117
-15
lines changed

5 files changed

+117
-15
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ When `use_clock=True` (default), the replay framework will:
146146
When `use_clock=False`, the replay will:
147147
- Skip `/clock` topic publishing
148148

149+
#### Termination Condition
150+
151+
By default, the replay will run until all messages from the input MCAP are played back.
152+
After that, the `run` stops immediately and the `analyze` step is executed.
153+
154+
To wait for `run` to finish instead of stopping immediately after playback, you can set a `ignore_playback_finish` flag in `ReplayRunParams`:
155+
156+
```python
157+
from replay_testing import ReplayRunParams
158+
159+
@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True))
160+
class Run:
161+
def generate_launch_description(self) -> LaunchDescription:
162+
# Your launch description here
163+
# Note: Every node in the launch description must finish on its own. Otherwise, the test will hang.
164+
pass
165+
```
166+
149167
### Analyze `@analyze`
150168

151169
The analyze step is run after the mcap from the `run` is recorded and written. It is a basic wrapper over `unittest.TestCase`, so any `unittest` assertions are built in.

replay_testing/decorators/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,5 @@ def parameterize(parameters: list[ReplayRunParams]):
5252
return run(parameters=parameters)
5353

5454
@staticmethod
55-
def default(params: dict = {}):
56-
return run(parameters=[ReplayRunParams(name='default', params=params)])
55+
def default(params: ReplayRunParams = ReplayRunParams(name='default')):
56+
return run(parameters=[params])

replay_testing/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ class RunnerArgs(BaseModel):
3434

3535
class ReplayRunParams(BaseModel):
3636
name: str
37-
params: dict
37+
params: dict = {}
3838
runner_args: RunnerArgs = RunnerArgs()
39+
ignore_playback_finish: bool = False
3940

4041

4142
class Mcap(BaseModel):

replay_testing/replay_runner.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,26 +139,30 @@ def _create_run_launch_description(
139139
output='screen',
140140
)
141141

142-
# Event handler to gracefully exit when the process finishes
143-
on_exit_handler = RegisterEventHandler(
144-
OnProcessExit(
145-
target_action=player_action,
146-
# Shutdown the launch service
147-
on_exit=[launch.actions.EmitEvent(event=Shutdown())],
148-
)
149-
)
150-
151-
# Launch description with the event handler
152-
return LaunchDescription([
142+
# Launch description
143+
ld = LaunchDescription([
153144
ExecuteProcess(
154145
cmd=['ros2', 'bag', 'record', '-s', 'mcap', '-o', str(run_fixture.path), '--all'],
155146
output='screen',
156147
),
157148
test_ld,
158149
player_action, # Add the MCAP playback action
159-
on_exit_handler, # Add the event handler to shutdown after playback finishes
160150
])
161151

152+
if not params.ignore_playback_finish:
153+
# Event handler to gracefully exit when the process finishes
154+
on_exit_handler = RegisterEventHandler(
155+
OnProcessExit(
156+
target_action=player_action,
157+
# Shutdown the launch service
158+
on_exit=[launch.actions.EmitEvent(event=Shutdown())],
159+
)
160+
)
161+
162+
ld.add_action(on_exit_handler)
163+
164+
return ld
165+
162166
def filter_fixtures(self) -> list[ReplayFixture]:
163167
self._log_stage_start(ReplayTestingPhase.FIXTURES)
164168

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# Copyright (c) 2025-present Polymath Robotics, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
import pathlib
17+
18+
from launch import LaunchDescription
19+
from launch.actions import EmitEvent, ExecuteProcess, RegisterEventHandler
20+
from launch.event_handlers import OnProcessExit
21+
from launch.events import Shutdown
22+
23+
from replay_testing import (
24+
LocalFixture,
25+
ReplayRunParams,
26+
analyze,
27+
fixtures,
28+
read_messages,
29+
run,
30+
)
31+
32+
cmd_vel_only_fixture = pathlib.Path(__file__).parent.parent / 'fixtures' / 'cmd_vel_only.mcap'
33+
34+
35+
@fixtures.parameterize([LocalFixture(path=cmd_vel_only_fixture)])
36+
class FilterFixtures:
37+
required_input_topics = ['/vehicle/cmd_vel']
38+
expected_output_topics = ['/user/cmd_vel']
39+
40+
41+
@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True))
42+
class Run:
43+
def generate_launch_description(self) -> LaunchDescription:
44+
target_process = ExecuteProcess(
45+
cmd=[
46+
'ros2',
47+
'topic',
48+
'pub',
49+
'-r',
50+
'10',
51+
'-t',
52+
'20',
53+
'/user/cmd_vel',
54+
'geometry_msgs/msg/Twist',
55+
'{linear: {x: 1.0}, angular: {z: 0.5}}',
56+
],
57+
name='topic_pub',
58+
output='screen',
59+
)
60+
on_exit_handler = RegisterEventHandler(
61+
OnProcessExit(
62+
target_action=target_process,
63+
# Shutdown the launch service
64+
on_exit=[EmitEvent(event=Shutdown())],
65+
)
66+
)
67+
return LaunchDescription([
68+
target_process,
69+
on_exit_handler,
70+
])
71+
72+
73+
@analyze
74+
class AnalyzeBasicReplay:
75+
def test_expected_failure(self):
76+
msgs_it = read_messages(self.reader, topics=['/user/cmd_vel'])
77+
78+
msgs = [(topic_name, msg, timestamp) for topic_name, msg, timestamp in msgs_it]
79+
assert len(msgs) == 20

0 commit comments

Comments
 (0)