ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
1

[ROS2] asyncio await with timeout a service call in a callback

asked 2023-03-16 18:08:34 -0500

charlie92 gravatar image

updated 2023-03-17 10:26:50 -0500

Hi! I am trying to use the asyncio syntax to await for a service to complete inside a callback function of a Node. A dummy node to test this:

#!/usr/bin/env python3
import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from std_msgs.msg import Empty
from rcl_interfaces.srv import ListParameters
import asyncio

class TestNode(Node):
    def __init__(self) -> None:
        Node.__init__(
            self,
            "test_node",
        )
        self.callback_group = ReentrantCallbackGroup()
        self._sub = self.create_subscription(
            topic="test",
            msg_type=Empty,
            callback=self.cb_test,
            qos_profile=5,
            callback_group=self.callback_group,
        )

        self.client = self.create_client(
            ListParameters,
            "/test_node/list_parameters"
        )

    async def cb_test(self, msg: Empty) -> None:

        service_request = ListParameters.Request()
        srv_future = self.client.call_async(service_request)
        try:
            # result = await asyncio.wait_for(srv_future, timeout=10.0)
            result = await srv_future
        except Exception as e:
            self.get_logger().error(f"Service call failed! what(): {str(e)}")
        else:
            self.get_logger().info(f"Service call success! what(): {str(result)}")


def main(args=None):
    rclpy.init(args=args)

    node = TestNode()
    executor = SingleThreadedExecutor()

    rclpy.spin(node, executor)
    node.destroy_node()
    rclpy.try_shutdown()


if __name__ == "__main__":
    main()

You can test it with

ros2 topic pub -1 /test std_msgs/msg/Empty {}

That code works just fine. The callback function doesn't block any other callback functions and it returns the result of the service. Now If I want to add a timeout to that future with asyncio.wait_for(future, timeout=10.0) it fails with:

[ERROR][1679007966.209838027][test_node]: Service call failed! what(): no running event loop

I tried everything, spawning a new event_loop in a separate thread, calling event_loop.run_forever in the main thread, and spawning the rclpy.spin in a thread. Nothing works.

I tried this alternative approach:

    # ... another method from the previous code
    def run(self):

        self.event_loop = asyncio.get_event_loop()

        # this task will run forever to process ROS node events
        async def rosloop():
            while rclpy.ok():
                rclpy.spin_once(self, timeout_sec=0)
                await asyncio.sleep(0.01)

        # perform main event loop processing
        try:
            asyncio.ensure_future(rosloop())
            self.event_loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self.event_loop.close()
            self.event_loop = None

def main(args=None):
    rclpy.init(args=args)

    node = TestNode()    
    node.run()
    rclpy.try_shutdown()

Now I get the error:

[ERROR][1679008951.750327026][test_node]: Service call failed! what(): await wasn't used with future

Does anyone know how to handle this? Any asyncio and rclpy expert for advice? I want to stick with a Singlethreaded executor

[UPDATE] PD: If I use the async_timeout package and the alternative approach for spinning the node it works, but I still don't want to depend on an external package

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
0

answered 2023-03-17 18:18:25 -0500

charlie92 gravatar image

I find a workaround using asyncio.ensure_future as follows:

class TestNode(Node):
    def __init__(self) -> None:
        Node.__init__(
            self,
            "test_node",
        )
        self.callback_group = ReentrantCallbackGroup()

        self._sub = self.create_subscription(
            topic="test",
            msg_type=Empty,
            callback=self.cb_test,
            qos_profile=5,
            callback_group=self.callback_group,
        )

        self.client = self.create_client(
            ListParameters,
            "/test_node/list_parameters",
            callback_group=MutuallyExclusiveCallbackGroup(),
        )

    def cb_test(self, msg: Empty) -> None:
        # Schedule the async function and add it to the node's callback group.
        asyncio.ensure_future(self.async_cb_test(msg))

    async def async_cb_test(self, msg: Empty) -> None:
        service_request = ListParameters.Request()
        srv_future = self.client.call_async(service_request)
        try:
            result = await asyncio.wait_for(srv_future, timeout=10.0)
        except Exception as e:
            self.get_logger().error(f"Service call failed! what(): {str(e)}")
        else:
            self.get_logger().info(f"Service call success! what(): {str(result)}")


async def spin(executor: SingleThreadedExecutor):
    while rclpy.ok():
        executor.spin_once()
        # Setting the delay to 0 provides an optimized path to allow other tasks to run.
        await asyncio.sleep(0)


def main(args=None):
    rclpy.init(args=args)

    node = TestNode()
    executor = SingleThreadedExecutor()
    executor.add_node(node)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(spin(executor))

    node.destroy_node()
    rclpy.shutdown()
edit flag offensive delete link more
0

answered 2023-03-17 02:02:39 -0500

Per Edwardsson gravatar image

I've had problems of the same sort. I think it comes down to a design problem: awaiting an inner callback inside of an outer callback invites the problem of deadlocks. The way to solve it, I believe, is through callback groups.

Have a look at these links for more information: https://docs.ros.org/en/foxy/How-To-G...

https://github.com/ros2/examples/blob...

edit flag offensive delete link more

Comments

I think it has nothing to do with callback groups but the interaction of asyncio with ROS2 internals The example you shared looks exactly like the code I shared that is working, but without a timeout for the await of the service future

charlie92 gravatar image charlie92  ( 2023-03-17 10:35:53 -0500 )edit

Question Tools

2 followers

Stats

Asked: 2023-03-16 18:08:34 -0500

Seen: 1,613 times

Last updated: Mar 17 '23