ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
Thank you ljaniec for your answer. In case someone stumbles on a similar issue here is how I solved my problem.
After navigating the ROS2 documentation, I discovered the concept of lifecycle nodes. Here are a few links that helped me understand how to use it :
The main idea is to create a lifecycle node with different states. The main_gui.py
script controls the states. And the talker lifecycle node will publish when it is in the activate
state. And stop publishing in the deactivate
state.
main_gui.py
- Adapted from : https://github.com/ros2/demos/blob/humble/lifecycle_py/lifecycle_py/talker.py
import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from std_msgs.msg import String
class LifecycleTalker(Node):
def __init__(self):
super().__init__('play_button')
self.pub = None
self.timer = None
self._count = 0
def publish(self):
"""Publish a new message when enabled."""
msg = String()
msg.data = "Lifecycle HelloWorld #" + str(self._count)
self._count += 1
# Only if the publisher is in an active state, the message transfer is
# enabled and the message actually published.
if self.pub is not None:
self.pub.publish(msg)
self.get_logger().info(f'Lifecycle publisher is active. Published: [{msg.data}]')
def on_configure(self, state: State) -> TransitionCallbackReturn:
self.pub = self.create_lifecycle_publisher(String, "lifecycle_chatter", 10)
self.get_logger().info("on_configure() is called.")
return super().on_configure(state)
def on_activate(self, state: State) -> TransitionCallbackReturn:
# Log, only for demo purposes
self.get_logger().info("on_activate() is called.")
self.timer = self.create_timer(1.0, self.publish)
return super().on_activate(state)
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info("on_deactivate() is called.")
self.destroy_timer(self.timer)
return super().on_deactivate(state)
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
self.destroy_timer(self.timer)
self.destroy_publisher(self.pub)
self.get_logger().info('on_cleanup() is called.')
return super().on_cleanup(state)
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
self.destroy_timer(self.timer)
self.destroy_publisher(self.pub)
self.get_logger().info('on_shutdown() is called.')
return super().on_shutdown(state)
def main():
rclpy.init()
executor = rclpy.executors.SingleThreadedExecutor()
lc_node = LifecycleTalker()
executor.add_node(lc_node)
try:
executor.spin()
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
lc_node.destroy_node()
if __name__ == '__main__':
main()
main_gui.py
import PySimpleGUI as sg
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import ExternalShutdownException
from lifecycle_msgs.srv import ChangeState
from lifecycle_msgs.msg import Transition
def minimal_layout():
layout = [[sg.Button('Talk'), sg.Button('Exit')]]
return layout
class MainGui(Node):
def __init__(self):
super().__init__('gui_client')
# Initialization of the GUI
self.layout = minimal_layout()
self.window = sg.Window('Lifecycle GUI', self.layout, location=(800, 400), finalize=True)
self.values, self.event = None, ''
# Initialization of the publisher
timer_period = 0.1
self.timer = self.create_timer(timer_period, self.gui_callback)
# Node's default callback group is mutually exclusive. This would prevent the client response
# from being processed until the timer callback finished, but the timer callback in this
# example is waiting for the client response
self.cb_group = ReentrantCallbackGroup()
# Initialize the change_state_client that will update the talker state
self.change_talking_state_cli = self.create_client(ChangeState, '/talker/change_state',
callback_group=self.cb_group)
# Transition to configured state
self.req = ChangeState.Request()
self.req.transition = Transition(label='configure')
# Call the on_configure state
future = self.change_talking_state_cli.call_async(self.req)
rclpy.spin_until_future_complete(self, future)
# Initialize the variable that will monitor the talker state
self.is_talking = False
async def gui_callback(self):
self.event, self.values = self.window.read(timeout=10)
if self.event in ('Exit', 'None'):
exit()
elif self.event == 'Talk':
###### Here start the talker like it would with `ros2 run py_pubsub talker`
# Update the transition according to current button state
next_talking_transition = 'activate' if not self.is_talking else 'deactivate'
# Create the change state object
self.req.transition = Transition(label=next_talking_transition)
# Call to the service
future_lifecycle = self.change_talking_state_cli.call_async(self.req)
result_change_state = await future_lifecycle
# If the request was successful, update the streaming status
if result_change_state.success:
self.is_talking = not self.is_talking
button_color = sg.theme_button_color() if not self.is_talking else 'red'
self.window['Talk'].update(button_color=button_color)
def main(args=None):
rclpy.init(args=args)
# Create the node
minimal_subscriber = MainGui()
try:
# Spin the node so the callback function is called.
rclpy.spin(minimal_subscriber)
except (KeyboardInterrupt, ExternalShutdownException):
# If the node receive a KeyboardInterrupt command
pass
finally:
# Destroy the node explicitly
# (optional - Done automatically when node is garbage collected)
minimal_subscriber.destroy_node()
rclpy.shutdown()
rclpy.shutdown()
if __name__ == '__main__':
main()
main.launch.py
from launch import LaunchDescription
from launch.actions import EmitEvent
from launch.events import Shutdown
from launch_ros.actions import Node, LifecycleNode
def generate_launch_description():
ld = LaunchDescription()
gui_node = LifecycleNode(
package='minimal_examples',
executable='main_gui',
name='main_gui',
namespace='',
output='screen',
on_exit=EmitEvent(event=Shutdown(reason='Window closed'))
)
lifecycle_talker_node = LifecycleNode(
package='minimal_examples',
executable='lifecycle_talker',
name='talker',
namespace='',
output='screen'
)
ld.add_action(gui_node)
ld.add_action(lifecycle_talker_node)
return ld
setup.py
import os
from glob import glob
from setuptools import setup
package_name = 'minimal_examples'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include all launch files.
(os.path.join('share', package_name), glob('launch/*launch.[pxy][yma]*'))
],
install_requires=['setuptools'],
zip_safe=True,
description='Packages that includes minimal example for the ROS forum',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'main_gui = minimal_examples.main_gui:main',
'lifecycle_talker = minimal_examples.lifecycle_nodes:main'
],
},
)
If you want to monitor the topic, run : ros2 topic echo /lifecycle_chatter