Ask Your Question
1

ros2 launch lifecycle callback component

asked 2021-05-11 09:56:44 -0500

highmax1234 gravatar image

updated 2021-05-11 09:57:56 -0500

It turns out that launching a lifecycle node which is also a component is way harder than expected.

This link shows how to launch a lifecycle, but does not accept a component for the change_state callbacks, obviously. https://raw.githubusercontent.com/ros...

This link is an example of how to launch a component, but does not accept a lifecycle node. https://github.com/astuff/pacmod3/blo...

I tried to add a client node to the launch file which should call the change_state service, but lifecycle nodes are not yet supported in python.

Is there a way to combine the two, so that I can launch a lifecycle component and automatically execute callbacks when it reaches certain states?

Or maybe, can such an event handler be added to an already existing node/service?

register_event_handler_for_talker_reaches_inactive_state = launch.actions.RegisterEventHandler(
        launch_ros.event_handlers.OnStateTransition(
            target_lifecycle_node=talker_node, goal_state='inactive',
            entities=[
                launch.actions.LogInfo(
                    msg="node 'talker' reached the 'inactive' state, 'activating'."),
                launch.actions.EmitEvent(event=launch_ros.events.lifecycle.ChangeState(
                    lifecycle_node_matcher=launch.events.matches_action(talker_node),
                    transition_id=lifecycle_msgs.msg.Transition.TRANSITION_ACTIVATE,
                )),
            ],
        )
    )
edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
0

answered 2021-05-11 13:54:55 -0500

highmax1234 gravatar image

updated 2021-05-11 14:02:35 -0500

I did not manage to solve it the way I wanted to, but at least I found a way to call services from the launch file. You can just import the CLI service call and imitate a command line. Alternatively, it is of course possible to create a custom node, but it adds complexity to the launch file.

This "solution" lacks a watchdog to cancel the thread after a timeout, as well as error handling if the service call was unsuccessful Both may be added rather easily to the following:

 #!/usr/bin/python3
import launch
import launch.actions
import launch_ros.actions
from launch.substitutions import EnvironmentVariable
from launch_ros.actions.lifecycle_node import LifecycleNode 
from launch_ros.descriptions import ComposableNode
from launch_ros.actions import ComposableNodeContainer, Node
import launch_ros.events.lifecycle  # noqa: E402
import sys
import threading
from ros2service.verb.call import requester

compose_namespace = 'lifecycle_namespace'
compose_name = 'my_lifecycle_node'

class service_call:
    def __init__(self,values):
        self.service_type = 'lifecycle_msgs/srv/ChangeState'
        self.service_name = '/' + compose_namespace + '/' + compose_name + '/change_state'
        self.values = "{transition: {id: 0, label: '" + values + "'}}" 
        self.period = None

def execute_calls(service_calls):
    for call in service_calls:
        requester(call.service_type,call.service_name,call.values,call.period)

def generate_launch_description():
    ld = launch.LaunchDescription()
    my_node = ComposableNode(
                package="my_lifecycle_package",
                plugin="my_lifecycle_package::lifecycle_plugin",
                name=compose_name,
                namespace=compose_namespace,
                parameters=[
                ],
                remappings=[
                ],
            )

    container = ComposableNodeContainer(
        name='receiver_simulation',
        namespace='/receiver_simulation',
        package='rclcpp_components',
        executable='component_container',
        composable_node_descriptions=[
            my_node
        ]
    )
    ld.add_action(container)
    return ld

def main(argv=sys.argv[1:]):
    service_calls = []
    service_calls.append(service_call('configure'))
    service_calls.append(service_call('activate'))

    driver = threading.Thread(target=execute_calls,args=(service_calls,))
    driver.start()

    ld = generate_launch_description()
    ls = launch.LaunchService(argv=argv, debug=False)
    ls.include_launch_description(ld)
    ls.run()
    driver.join()

if __name__ == "__main__":
    main()
edit flag offensive delete link more

Your Answer

Please start posting anonymously - your entry will be published after you log in or create a new account.

Add Answer

Question Tools

2 followers

Stats

Asked: 2021-05-11 09:56:44 -0500

Seen: 36 times

Last updated: May 11