How to use a ros2 service to trigger an asyncio non-blocking function (without Actions)
The problem I am having is that:
To start an async function in the background I need an asycio event loop.
This event loop usualy exists in the main thread, and when started, blocks the exeuction of that thread (i.e lines of code after starting the event loop aren't run untill the event loop is cancelled).
However, ROS2 has it's own event loop (executor) that also usually runs in the main thread and blocks execution. This means it is difficult to have both event loops running
My attempted sollution was to start the asyncio event loop in a seperate thread. This is started in the Node constructor, and stops after the Node is deconstructed.
This looks like this:
class IncrementPercentDoneServiceNode(Node):
def __create_task(self, f: Awaitable):
self.__task = self.__loop.create_task(f)
def __init__(self):
super().__init__('increment_percent_done_service_node')
self.__loop = asyncio.new_event_loop()
self.__task: Optional[Task] = None
self.__thread = threading.Thread(target=self.__loop.run_forever)
self.__thread.start()
self.done = False
self.create_service(Trigger, 'start_incrementing',
callback=lambda request, responce : (
self.get_logger().info("Starting service"),
self.__loop.call_soon_threadsafe(self.__create_task, self.__increment_percent_complete()),
TriggerResponse(success=True, message='')
)[-1]
)
def __del__(self):
print("stopping loop")
self.done = True
if self.__task is not None:
self.__task.cancel()
self.__loop.stop()
self.__thread.join()
async def __increment_percent_complete(self):
timeout_start = time.time()
duration = 5
while time.time() < (timeout_start + duration):
time_since_start = time.time() - timeout_start
percent_complete = (time_since_start / duration) * 100.0
self.get_logger().info("Percent complete: {}%".format(percent_complete))
await asyncio.sleep(0.5)
self.get_logger().info("leaving async function")
self.done = True
if __name__ == '__main__':
rclpy.init()
test = IncrementPercentDoneServiceNode()
e = MultiThreadedExecutor()
e.add_node(test)
e.spin()
Is this a sensible way to do it? Is there a better way? How would I cancel the start_incrementing
service with another service? (I know that this is what actions are for, but I cannot use them in this instance).
Hi, have you found a way to integrate your events inside ros event loop?
There is some new development that would make this better, but it's unlikely there will be strong support for using a
MultiThreadedExecutor
andasyncio
in the near future.