ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
1

[ROS 2] How to start and stop a node from a Python script?

asked 2023-02-01 09:50:24 -0500

rmapsps gravatar image

updated 2023-02-22 11:19:41 -0500

Context: I am running a camera using ROS. To facilitate data collection I integrated a PySimpleGui graphical user interface inside a Node, so I can start streaming, recording or playing back data through a user-friendly interface.

Expected behaviour: Therefore, my goal is to execute a Node, when the user clicks on a button. For instance, if the following minimal example, I want to start a publisher Node when the user clicks on the button 'Talk'. And I want to start a subscriber Node when the user clicks on 'Listen'.

Platform : I run ROS2 humble on WSL2 - Ubuntu 22.04.1 LTS

Code: Here is GUI publisher, which will publish a string corresponding to the button name to the topic \gui_command when the user clicks on a button.

import PySimpleGUI as sg
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

def generate_layout():
    layout = [[sg.Button('Talk'), sg.Button('Listen'), sg.Button('Exit')]]
    return layout

class MinimalGuiPublisher(Node):

    def __init__(self):
        super().__init__('gui')

        # Initialization of the GUI
        self.layout = generate_layout()
        self.window = sg.Window('Camera recording', self.layout, location=(800, 400), finalize=True)
        self.values, self.event = None, ''

        # Initialization of the publisher
        self.publisher_ = self.create_publisher(String, 'gui_command', 10)
        timer_period = 0.2
        self.timer = self.create_timer(timer_period, self.gui_callback)

    def gui_callback(self):
        msg = String()

        # Check for any event
        self.event, self.values = self.window.read()
        print(self.event)

        msg.data = str(self.event)

        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)

        if self.event in ('Exit', None):
            print('User clicked on Exit')
            exit()

def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalGuiServer()

    rclpy.spin(minimal_publisher)

    minimal_publisher.destroy_node()

    rclpy.shutdown()

if __name__ == '__main__':
    main()

Here is the GUI subscriber that listens to the command through subscription to the topic \gui_command.

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalGuiClient(Node):

    def __init__(self):
        super().__init__('gui_client')
        self.subscription = self.create_subscription(
            String,
            'gui_command',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        recorder = None
        self.get_logger().info('User clicked on: "%s"' % msg.data)

        if msg.data in ('Exit', 'None'):
            exit()

        elif msg.data == 'Talk':
            ###### Here start the talker like it would with
            ###### ros2 run py_pubsub talker
            pass

        elif msg.data == 'Listen':
            ###### Here start the talker like it would with
            ###### ros2 run py_pubsub listener
            pass


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalGuiClient()

    rclpy.spin(minimal_subscriber)

    minimal_subscriber.destroy_node()

    rclpy.shutdown()


if __name__ == '__main__':
    main()

Question : Is it possible to program the execution of a Node from another Node? If yes, could you please indicate which is the correct way to do so?

I tried to read the doc, and thought about using ros2launch (edited). But it is not clear to me if and how a node or a launch can be triggered by user interaction. Your help would be very much appreciated !!

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
1

answered 2023-02-22 13:16:44 -0500

rmapsps gravatar image

Thank you ljaniec for your answer. In case someone stumbles on a similar issue here is how I solved my problem.

After navigating the ROS2 documentation, I discovered the concept of lifecycle nodes. Here are a few links that helped me understand how to use it :

The main idea is to create a lifecycle node with different states. The main_gui.py script controls the states. And the talker lifecycle node will publish when it is in the activate state. And stop publishing in the deactivate state.

main_gui.py- Adapted from : https://github.com/ros2/demos/blob/hu...

import rclpy
from rclpy.lifecycle import Node
from rclpy.lifecycle import State
from rclpy.lifecycle import TransitionCallbackReturn
from std_msgs.msg import String

class LifecycleTalker(Node):

    def __init__(self):
        super().__init__('play_button')

        self.pub = None
        self.timer = None
        self._count = 0

    def publish(self):
        """Publish a new message when enabled."""
        msg = String()
        msg.data = "Lifecycle HelloWorld #" + str(self._count)
        self._count += 1

        # Only if the publisher is in an active state, the message transfer is
        # enabled and the message actually published.
        if self.pub is not None:
            self.pub.publish(msg)
            self.get_logger().info(f'Lifecycle publisher is active. Published: [{msg.data}]')

    def on_configure(self, state: State) -> TransitionCallbackReturn:
        self.pub = self.create_lifecycle_publisher(String, "lifecycle_chatter", 10)

        self.get_logger().info("on_configure() is called.")
        return super().on_configure(state)

    def on_activate(self, state: State) -> TransitionCallbackReturn:
        # Log, only for demo purposes
        self.get_logger().info("on_activate() is called.")
        self.timer = self.create_timer(1.0, self.publish)

        return super().on_activate(state)

    def on_deactivate(self, state: State) -> TransitionCallbackReturn:
        self.get_logger().info("on_deactivate() is called.")
        self.destroy_timer(self.timer)
        return super().on_deactivate(state)

    def on_cleanup(self, state: State) -> TransitionCallbackReturn:
        self.destroy_timer(self.timer)
        self.destroy_publisher(self.pub)

        self.get_logger().info('on_cleanup() is called.')
        return super().on_cleanup(state)

    def on_shutdown(self, state: State) -> TransitionCallbackReturn:
        self.destroy_timer(self.timer)
        self.destroy_publisher(self.pub)

        self.get_logger().info('on_shutdown() is called.')
        return super().on_shutdown(state)


def main():
    rclpy.init()

    executor = rclpy.executors.SingleThreadedExecutor()
    lc_node = LifecycleTalker()
    executor.add_node(lc_node)
    try:
        executor.spin()
    except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
        lc_node.destroy_node()


if __name__ == '__main__':
    main()

main_gui.py

import PySimpleGUI as sg

import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import ExternalShutdownException

from lifecycle_msgs.srv import ChangeState
from lifecycle_msgs.msg import Transition


def minimal_layout():
    layout = [[sg.Button('Talk'), sg.Button('Exit')]]
    return layout


    class MainGui(Node):

        def __init__(self):
            super().__init__('gui_client')

            # Initialization of the GUI
            self.layout = minimal_layout()
            self.window = sg.Window('Lifecycle GUI', self.layout, location=(800, 400), finalize=True)
            self.values, self.event = None, ''

            # Initialization of the publisher
            timer_period = 0.1
            self.timer = self.create_timer(timer_period, self.gui_callback)

            # Node's default callback group is mutually exclusive ...
(more)
edit flag offensive delete link more

Comments

Really nice solution, I hadn't thought of Lifecycle Nodes but they fit perfectly there, well done!

ljaniec gravatar image ljaniec  ( 2023-02-22 18:20:37 -0500 )edit
0

answered 2023-02-13 02:14:34 -0500

ljaniec gravatar image

updated 2023-02-13 02:26:32 -0500

I would try to prepare correct launchers and then use launch API to start the chosen node with its launch file in the service call (rather than topic subscriber callback).

One thing for sure needs to be mentioned - you don't want to use roslaunch, it's a ROS1 version of launcher. You want to use ros2 launch <your-package> <your-launch-file>.

Example sources for writing launchers:

One of the other suggested approach to user - ROS2 interaction interface is presented here (QML):

Perhaps you can check it and see how the inner code is working and compare it to your ideas.

edit flag offensive delete link more

Question Tools

2 followers

Stats

Asked: 2023-02-01 09:50:24 -0500

Seen: 3,062 times

Last updated: Feb 22 '23