ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

I did not manage to solve it the way I wanted to, but at least I found a way to call services from the launch file. You can just import the CLI service call and imitate a command line. Alternatively, it is of course possible to create a custom node, but it adds complexity to the launch file.

This "solution" lacks a watchdog to cancel the thread after a timeout, as well as error handling if the service call was unsuccessful Both may be added rather easily to the following:

 #!/usr/bin/python3
import launch
import launch.actions
import launch.substitutions
import launch_ros.actions
from ament_index_python.packages import get_package_share_directory
from launch.actions import ExecuteProcess, SetEnvironmentVariable
from launch.substitutions import EnvironmentVariable
from launch_ros.actions.lifecycle_node import LifecycleNode 
from launch_ros.descriptions import ComposableNode
from launch_ros.actions import ComposableNodeContainer, Node
import launch_ros.events.lifecycle  # noqa: E402
import sys
from time import sleep
import threading
from ros2service.verb.call import requester

compose_namespace = 'lifecycle_namespace'
compose_name = 'my_lifecycle_node'

class service_call:
    def __init__(self,values):
        self.service_type = 'lifecycle_msgs/srv/ChangeState'
        self.service_name = '/' + compose_namespace + '/' + compose_name + '/change_state'
        self.values = "{transition: {id: 0, label: '" + values + "'}}" 
        self.period = None

def execute_calls(service_calls):
    for call in service_calls:
        requester(call.service_type,call.service_name,call.values,call.period)

def generate_launch_description():
    ld = launch.LaunchDescription()
    my_node = ComposableNode(
                package="my_lifecycle_package",
                plugin="my_lifecycle_package::lifecycle_plugin",
                name=compose_name,
                namespace=compose_namespace,
                parameters=[
                ],
                remappings=[
                ],
            )

    container = ComposableNodeContainer(
        name='receiver_simulation',
        namespace='/receiver_simulation',
        package='rclcpp_components',
        executable='component_container',
        composable_node_descriptions=[
            my_node
        ]
    )
    ld.add_action(load_receiver)
    return ld

def main(argv=sys.argv[1:]):
    service_calls = []
    service_calls.append(service_call('configure'))
    service_calls.append(service_call('activate'))

    driver = threading.Thread(target=execute_calls,args=(service_calls,))
    driver.start()

    ld = generate_launch_description()
    ls = launch.LaunchService(argv=argv, debug=False)
    ls.include_launch_description(ld)
    ls.run()
    driver.join()

if __name__ == "__main__":
    main()

I did not manage to solve it the way I wanted to, but at least I found a way to call services from the launch file. You can just import the CLI service call and imitate a command line. Alternatively, it is of course possible to create a custom node, but it adds complexity to the launch file.

This "solution" lacks a watchdog to cancel the thread after a timeout, as well as error handling if the service call was unsuccessful Both may be added rather easily to the following:

 #!/usr/bin/python3
import launch
import launch.actions
import launch.substitutions
import launch_ros.actions
from ament_index_python.packages import get_package_share_directory
from launch.actions import ExecuteProcess, SetEnvironmentVariable
from launch.substitutions import EnvironmentVariable
from launch_ros.actions.lifecycle_node import LifecycleNode 
from launch_ros.descriptions import ComposableNode
from launch_ros.actions import ComposableNodeContainer, Node
import launch_ros.events.lifecycle  # noqa: E402
import sys
from time import sleep
import threading
from ros2service.verb.call import requester

compose_namespace = 'lifecycle_namespace'
compose_name = 'my_lifecycle_node'

class service_call:
    def __init__(self,values):
        self.service_type = 'lifecycle_msgs/srv/ChangeState'
        self.service_name = '/' + compose_namespace + '/' + compose_name + '/change_state'
        self.values = "{transition: {id: 0, label: '" + values + "'}}" 
        self.period = None

def execute_calls(service_calls):
    for call in service_calls:
        requester(call.service_type,call.service_name,call.values,call.period)

def generate_launch_description():
    ld = launch.LaunchDescription()
    my_node = ComposableNode(
                package="my_lifecycle_package",
                plugin="my_lifecycle_package::lifecycle_plugin",
                name=compose_name,
                namespace=compose_namespace,
                parameters=[
                ],
                remappings=[
                ],
            )

    container = ComposableNodeContainer(
        name='receiver_simulation',
        namespace='/receiver_simulation',
        package='rclcpp_components',
        executable='component_container',
        composable_node_descriptions=[
            my_node
        ]
    )
    ld.add_action(load_receiver)
ld.add_action(container)
    return ld

def main(argv=sys.argv[1:]):
    service_calls = []
    service_calls.append(service_call('configure'))
    service_calls.append(service_call('activate'))

    driver = threading.Thread(target=execute_calls,args=(service_calls,))
    driver.start()

    ld = generate_launch_description()
    ls = launch.LaunchService(argv=argv, debug=False)
    ls.include_launch_description(ld)
    ls.run()
    driver.join()

if __name__ == "__main__":
    main()

I did not manage to solve it the way I wanted to, but at least I found a way to call services from the launch file. You can just import the CLI service call and imitate a command line. Alternatively, it is of course possible to create a custom node, but it adds complexity to the launch file.

This "solution" lacks a watchdog to cancel the thread after a timeout, as well as error handling if the service call was unsuccessful Both may be added rather easily to the following:

 #!/usr/bin/python3
import launch
import launch.actions
import launch.substitutions
import launch_ros.actions
from ament_index_python.packages import get_package_share_directory
from launch.actions import ExecuteProcess, SetEnvironmentVariable
from launch.substitutions import EnvironmentVariable
from launch_ros.actions.lifecycle_node import LifecycleNode 
from launch_ros.descriptions import ComposableNode
from launch_ros.actions import ComposableNodeContainer, Node
import launch_ros.events.lifecycle  # noqa: E402
import sys
from time import sleep
import threading
from ros2service.verb.call import requester

compose_namespace = 'lifecycle_namespace'
compose_name = 'my_lifecycle_node'

class service_call:
    def __init__(self,values):
        self.service_type = 'lifecycle_msgs/srv/ChangeState'
        self.service_name = '/' + compose_namespace + '/' + compose_name + '/change_state'
        self.values = "{transition: {id: 0, label: '" + values + "'}}" 
        self.period = None

def execute_calls(service_calls):
    for call in service_calls:
        requester(call.service_type,call.service_name,call.values,call.period)

def generate_launch_description():
    ld = launch.LaunchDescription()
    my_node = ComposableNode(
                package="my_lifecycle_package",
                plugin="my_lifecycle_package::lifecycle_plugin",
                name=compose_name,
                namespace=compose_namespace,
                parameters=[
                ],
                remappings=[
                ],
            )

    container = ComposableNodeContainer(
        name='receiver_simulation',
        namespace='/receiver_simulation',
        package='rclcpp_components',
        executable='component_container',
        composable_node_descriptions=[
            my_node
        ]
    )
    ld.add_action(container)
    return ld

def main(argv=sys.argv[1:]):
    service_calls = []
    service_calls.append(service_call('configure'))
    service_calls.append(service_call('activate'))

    driver = threading.Thread(target=execute_calls,args=(service_calls,))
    driver.start()

    ld = generate_launch_description()
    ls = launch.LaunchService(argv=argv, debug=False)
    ls.include_launch_description(ld)
    ls.run()
    driver.join()

if __name__ == "__main__":
    main()