ros2 launch lifecycle callback component
It turns out that launching a lifecycle node which is also a component is way harder than expected.
This link shows how to launch a lifecycle, but does not accept a component for the changestate callbacks, obviously. https://raw.githubusercontent.com/ros2/launchros/foxy/launchros/examples/lifecyclepubsublaunch.py
This link is an example of how to launch a component, but does not accept a lifecycle node. https://github.com/astuff/pacmod3/blob/dashing-devel/launch/pacmod3_with_kvaser.py
I tried to add a client node to the launch file which should call the change_state service, but lifecycle nodes are not yet supported in python.
Is there a way to combine the two, so that I can launch a lifecycle component and automatically execute callbacks when it reaches certain states?
Or maybe, can such an event handler be added to an already existing node/service?
register_event_handler_for_talker_reaches_inactive_state = launch.actions.RegisterEventHandler(
launch_ros.event_handlers.OnStateTransition(
target_lifecycle_node=talker_node, goal_state='inactive',
entities=[
launch.actions.LogInfo(
msg="node 'talker' reached the 'inactive' state, 'activating'."),
launch.actions.EmitEvent(event=launch_ros.events.lifecycle.ChangeState(
lifecycle_node_matcher=launch.events.matches_action(talker_node),
transition_id=lifecycle_msgs.msg.Transition.TRANSITION_ACTIVATE,
)),
],
)
)
Asked by highmax1234 on 2021-05-11 09:56:44 UTC
Answers
I did not manage to solve it the way I wanted to, but at least I found a way to call services from the launch file. You can just import the CLI service call and imitate a command line. Alternatively, it is of course possible to create a custom node, but it adds complexity to the launch file.
This "solution" lacks a watchdog to cancel the thread after a timeout, as well as error handling if the service call was unsuccessful Both may be added rather easily to the following:
#!/usr/bin/python3
import launch
import launch.actions
import launch_ros.actions
from launch.substitutions import EnvironmentVariable
from launch_ros.actions.lifecycle_node import LifecycleNode
from launch_ros.descriptions import ComposableNode
from launch_ros.actions import ComposableNodeContainer, Node
import launch_ros.events.lifecycle # noqa: E402
import sys
import threading
from ros2service.verb.call import requester
compose_namespace = 'lifecycle_namespace'
compose_name = 'my_lifecycle_node'
class service_call:
def __init__(self,values):
self.service_type = 'lifecycle_msgs/srv/ChangeState'
self.service_name = '/' + compose_namespace + '/' + compose_name + '/change_state'
self.values = "{transition: {id: 0, label: '" + values + "'}}"
self.period = None
def execute_calls(service_calls):
for call in service_calls:
requester(call.service_type,call.service_name,call.values,call.period)
def generate_launch_description():
ld = launch.LaunchDescription()
my_node = ComposableNode(
package="my_lifecycle_package",
plugin="my_lifecycle_package::lifecycle_plugin",
name=compose_name,
namespace=compose_namespace,
parameters=[
],
remappings=[
],
)
container = ComposableNodeContainer(
name='receiver_simulation',
namespace='/receiver_simulation',
package='rclcpp_components',
executable='component_container',
composable_node_descriptions=[
my_node
]
)
ld.add_action(container)
return ld
def main(argv=sys.argv[1:]):
service_calls = []
service_calls.append(service_call('configure'))
service_calls.append(service_call('activate'))
driver = threading.Thread(target=execute_calls,args=(service_calls,))
driver.start()
ld = generate_launch_description()
ls = launch.LaunchService(argv=argv, debug=False)
ls.include_launch_description(ld)
ls.run()
driver.join()
if __name__ == "__main__":
main()
Asked by highmax1234 on 2021-05-11 13:54:55 UTC
Comments