[ROS2] asyncio await with timeout a service call in a callback
Hi! I am trying to use the asyncio syntax to await for a service to complete inside a callback function of a Node. A dummy node to test this:
#!/usr/bin/env python3
import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from std_msgs.msg import Empty
from rcl_interfaces.srv import ListParameters
import asyncio
class TestNode(Node):
def __init__(self) -> None:
Node.__init__(
self,
"test_node",
)
self.callback_group = ReentrantCallbackGroup()
self._sub = self.create_subscription(
topic="test",
msg_type=Empty,
callback=self.cb_test,
qos_profile=5,
callback_group=self.callback_group,
)
self.client = self.create_client(
ListParameters,
"/test_node/list_parameters"
)
async def cb_test(self, msg: Empty) -> None:
service_request = ListParameters.Request()
srv_future = self.client.call_async(service_request)
try:
# result = await asyncio.wait_for(srv_future, timeout=10.0)
result = await srv_future
except Exception as e:
self.get_logger().error(f"Service call failed! what(): {str(e)}")
else:
self.get_logger().info(f"Service call success! what(): {str(result)}")
def main(args=None):
rclpy.init(args=args)
node = TestNode()
executor = SingleThreadedExecutor()
rclpy.spin(node, executor)
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
You can test it with
ros2 topic pub -1 /test std_msgs/msg/Empty {}
That code works just fine. The callback function doesn't block any other callback functions and it returns the result of the service. Now If I want to add a timeout to that future with asyncio.wait_for(future, timeout=10.0)
it fails with:
[ERROR][1679007966.209838027][test_node]: Service call failed! what(): no running event loop
I tried everything, spawning a new event_loop in a separate thread, calling event_loop.run_forever
in the main thread, and spawning the rclpy.spin
in a thread. Nothing works.
I tried this alternative approach:
# ... another method from the previous code
def run(self):
self.event_loop = asyncio.get_event_loop()
# this task will run forever to process ROS node events
async def rosloop():
while rclpy.ok():
rclpy.spin_once(self, timeout_sec=0)
await asyncio.sleep(0.01)
# perform main event loop processing
try:
asyncio.ensure_future(rosloop())
self.event_loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self.event_loop.close()
self.event_loop = None
def main(args=None):
rclpy.init(args=args)
node = TestNode()
node.run()
rclpy.try_shutdown()
Now I get the error:
[ERROR][1679008951.750327026][test_node]: Service call failed! what(): await wasn't used with future
Does anyone know how to handle this? Any asyncio and rclpy expert for advice? I want to stick with a Singlethreaded executor
[UPDATE] PD: If I use the async_timeout package and the alternative approach for spinning the node it works, but I still don't want to depend on an external package