ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Thank you ljaniec for your answer. In case someone stumbles on a similar issue here is how I solved my problem.

After navigating the ROS2 documentation, I discovered the concept of lifecycle nodes. Here are a few links that helped me understand how to use it :

  • Lifecycle node state graph design : http://design.ros2.org/articles/node_lifecycle.html
  • Lifecycle talker examples: https://github.com/ros2/demos/blob/humble/lifecycle_py/lifecycle_py/talker.py
  • Launch file examples integrating lifecycle nodes: https://github.com/ros2/launch_ros/tree/rolling/ros2launch/examples
  • Event handlers : https://answers.ros.org/question/304370/ros2-launch-how-to-correctly-create-a-lifecycle-launch-file/
  • Callback groups: https://docs.ros.org/en/humble/How-To-Guides/Using-callback-groups.html

The main idea is to create a lifecycle node with different states. The main_gui.py script controls the states. And the talker lifecycle node will publish when it is in the activate state. And stop publishing in the deactivate state.

main_gui.py - Adapted from : https://github.com/ros2/demos/blob/humble/lifecycle_py/lifecycle_py/talker.py

import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from std_msgs.msg import String

class LifecycleTalker(Node):

    def __init__(self):
        super().__init__('play_button')

        self.pub = None
        self.timer = None
        self._count = 0

    def publish(self):
        """Publish a new message when enabled."""
        msg = String()
        msg.data = "Lifecycle HelloWorld #" + str(self._count)
        self._count += 1

        # Only if the publisher is in an active state, the message transfer is
        # enabled and the message actually published.
        if self.pub is not None:
            self.pub.publish(msg)
            self.get_logger().info(f'Lifecycle publisher is active. Published: [{msg.data}]')

    def on_configure(self, state: State) -> TransitionCallbackReturn:
        self.pub = self.create_lifecycle_publisher(String, "lifecycle_chatter", 10)

        self.get_logger().info("on_configure() is called.")
        return super().on_configure(state)

    def on_activate(self, state: State) -> TransitionCallbackReturn:
        # Log, only for demo purposes
        self.get_logger().info("on_activate() is called.")
        self.timer = self.create_timer(1.0, self.publish)

        return super().on_activate(state)

    def on_deactivate(self, state: State) -> TransitionCallbackReturn:
        self.get_logger().info("on_deactivate() is called.")
        self.destroy_timer(self.timer)
        return super().on_deactivate(state)

    def on_cleanup(self, state: State) -> TransitionCallbackReturn:
        self.destroy_timer(self.timer)
        self.destroy_publisher(self.pub)

        self.get_logger().info('on_cleanup() is called.')
        return super().on_cleanup(state)

    def on_shutdown(self, state: State) -> TransitionCallbackReturn:
        self.destroy_timer(self.timer)
        self.destroy_publisher(self.pub)

        self.get_logger().info('on_shutdown() is called.')
        return super().on_shutdown(state)


def main():
    rclpy.init()

    executor = rclpy.executors.SingleThreadedExecutor()
    lc_node = LifecycleTalker()
    executor.add_node(lc_node)
    try:
        executor.spin()
    except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
        lc_node.destroy_node()


if __name__ == '__main__':
    main()

main_gui.py

import PySimpleGUI as sg

import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import ExternalShutdownException

from lifecycle_msgs.srv import ChangeState
from lifecycle_msgs.msg import Transition


def minimal_layout():
    layout = [[sg.Button('Talk'), sg.Button('Exit')]]
    return layout


    class MainGui(Node):

        def __init__(self):
            super().__init__('gui_client')

            # Initialization of the GUI
            self.layout = minimal_layout()
            self.window = sg.Window('Lifecycle GUI', self.layout, location=(800, 400), finalize=True)
            self.values, self.event = None, ''

            # Initialization of the publisher
            timer_period = 0.1
            self.timer = self.create_timer(timer_period, self.gui_callback)

            # Node's default callback group is mutually exclusive. This would prevent the client response
            # from being processed until the timer callback finished, but the timer callback in this
            # example is waiting for the client response
            self.cb_group = ReentrantCallbackGroup()

            # Initialize the change_state_client that will update the talker state
            self.change_talking_state_cli = self.create_client(ChangeState, '/talker/change_state',
                                                               callback_group=self.cb_group)
            # Transition to configured state
            self.req = ChangeState.Request()
            self.req.transition = Transition(label='configure')

            # Call the on_configure state
            future = self.change_talking_state_cli.call_async(self.req)
            rclpy.spin_until_future_complete(self, future)

            # Initialize the variable that will monitor the talker state
            self.is_talking = False

        async def gui_callback(self):
            self.event, self.values = self.window.read(timeout=10)

            if self.event in ('Exit', 'None'):
                exit()

            elif self.event == 'Talk':
                ######  Here start the talker like it would with `ros2 run py_pubsub talker`

                # Update the transition according to current button state
                next_talking_transition = 'activate' if not self.is_talking else 'deactivate'

                # Create the change state object
                self.req.transition = Transition(label=next_talking_transition)

                # Call to the service
                future_lifecycle = self.change_talking_state_cli.call_async(self.req)
                result_change_state = await future_lifecycle

                # If the request was successful, update the streaming status
                if result_change_state.success:
                    self.is_talking = not self.is_talking
                    button_color = sg.theme_button_color() if not self.is_talking else 'red'
                    self.window['Talk'].update(button_color=button_color)


    def main(args=None):
        rclpy.init(args=args)

        # Create the node
        minimal_subscriber = MainGui()

        try:
            # Spin the node so the callback function is called.
            rclpy.spin(minimal_subscriber)

        except (KeyboardInterrupt, ExternalShutdownException):
            # If the node receive a KeyboardInterrupt command
            pass

        finally:
            # Destroy the node explicitly
            # (optional - Done automatically when node is garbage collected)
            minimal_subscriber.destroy_node()
            rclpy.shutdown()

        rclpy.shutdown()


    if __name__ == '__main__':
        main()

main.launch.py

from launch import LaunchDescription
from launch.actions import EmitEvent
from launch.events import Shutdown
from launch_ros.actions import Node, LifecycleNode


def generate_launch_description():
    ld = LaunchDescription()

    gui_node = LifecycleNode(
        package='minimal_examples',
        executable='main_gui',
        name='main_gui',
        namespace='',
        output='screen',
        on_exit=EmitEvent(event=Shutdown(reason='Window closed'))
    )

    lifecycle_talker_node = LifecycleNode(
        package='minimal_examples',
        executable='lifecycle_talker',
        name='talker',
        namespace='',
        output='screen'
    )

    ld.add_action(gui_node)
    ld.add_action(lifecycle_talker_node)

    return ld

setup.py

import os
from glob import glob
from setuptools import setup


package_name = 'minimal_examples'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
         ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        # Include all launch files.
        (os.path.join('share', package_name), glob('launch/*launch.[pxy][yma]*'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    description='Packages that includes minimal example for the ROS forum',
    license='Apache License 2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'main_gui = minimal_examples.main_gui:main',
            'lifecycle_talker = minimal_examples.lifecycle_nodes:main'
        ],
    },
)

If you want to monitor the topic, run : ros2 topic echo /lifecycle_chatter