ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
I find a workaround using asyncio.ensure_future
as follows:
class TestNode(Node):
def __init__(self) -> None:
Node.__init__(
self,
"test_node",
)
self.callback_group = ReentrantCallbackGroup()
self._sub = self.create_subscription(
topic="test",
msg_type=Empty,
callback=self.cb_test,
qos_profile=5,
callback_group=self.callback_group,
)
self.client = self.create_client(
ListParameters,
"/test_node/list_parameters",
callback_group=MutuallyExclusiveCallbackGroup(),
)
def cb_test(self, msg: Empty) -> None:
# Schedule the async function and add it to the node's callback group.
asyncio.ensure_future(self.async_cb_test(msg))
async def async_cb_test(self, msg: Empty) -> None:
service_request = ListParameters.Request()
srv_future = self.client.call_async(service_request)
try:
result = await asyncio.wait_for(srv_future, timeout=10.0)
except Exception as e:
self.get_logger().error(f"Service call failed! what(): {str(e)}")
else:
self.get_logger().info(f"Service call success! what(): {str(result)}")
async def spin(executor: SingleThreadedExecutor):
while rclpy.ok():
executor.spin_once()
# Setting the delay to 0 provides an optimized path to allow other tasks to run.
await asyncio.sleep(0)
def main(args=None):
rclpy.init(args=args)
node = TestNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
loop = asyncio.get_event_loop()
loop.run_until_complete(spin(executor))
node.destroy_node()
rclpy.shutdown()