How to use a ros2 service to trigger an asyncio non-blocking function (without Actions)

asked 2020-11-27 04:11:09 -0600

CraigH92 gravatar image

The problem I am having is that:

  • To start an async function in the background I need an asycio event loop.

  • This event loop usualy exists in the main thread, and when started, blocks the exeuction of that thread (i.e lines of code after starting the event loop aren't run untill the event loop is cancelled).

  • However, ROS2 has it's own event loop (executor) that also usually runs in the main thread and blocks execution. This means it is difficult to have both event loops running

My attempted sollution was to start the asyncio event loop in a seperate thread. This is started in the Node constructor, and stops after the Node is deconstructed.

This looks like this:

class IncrementPercentDoneServiceNode(Node):


    def __create_task(self, f: Awaitable):
        self.__task = self.__loop.create_task(f)

    def __init__(self):

        super().__init__('increment_percent_done_service_node')

        self.__loop = asyncio.new_event_loop()
        self.__task: Optional[Task] = None
        self.__thread = threading.Thread(target=self.__loop.run_forever)
        self.__thread.start()
        self.done = False

        self.create_service(Trigger, 'start_incrementing',
            callback=lambda request, responce : (
                self.get_logger().info("Starting service"),
                self.__loop.call_soon_threadsafe(self.__create_task, self.__increment_percent_complete()),
                TriggerResponse(success=True, message='')
            )[-1]
        )

    def __del__(self):
        print("stopping loop")
        self.done = True
        if self.__task is not None:
            self.__task.cancel()

        self.__loop.stop()
        self.__thread.join()

    async def __increment_percent_complete(self):
        timeout_start = time.time()        
        duration = 5

        while time.time() < (timeout_start + duration):
            time_since_start = time.time() - timeout_start
            percent_complete = (time_since_start / duration) * 100.0
            self.get_logger().info("Percent complete: {}%".format(percent_complete))
            await asyncio.sleep(0.5)

        self.get_logger().info("leaving async function")
        self.done = True

if __name__ == '__main__':

    rclpy.init()
    test = IncrementPercentDoneServiceNode()
    e = MultiThreadedExecutor()
    e.add_node(test)
    e.spin()

Is this a sensible way to do it? Is there a better way? How would I cancel the start_incrementing service with another service? (I know that this is what actions are for, but I cannot use them in this instance).

edit retag flag offensive close merge delete

Comments

Hi, have you found a way to integrate your events inside ros event loop?

flash_27 gravatar image flash_27  ( 2021-09-30 11:18:36 -0600 )edit

There is some new development that would make this better, but it's unlikely there will be strong support for using a MultiThreadedExecutor and asyncio in the near future.

achille gravatar image achille  ( 2023-04-11 13:20:04 -0600 )edit