ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
2

async/await in subscriber callback

asked 2020-09-30 17:57:19 -0500

chives_onion gravatar image

I have a ros node that needs to communicate with a non-ros over a tcp asynchronous socket connection. This node will be subscribed to a variety of topics and needs to send data over the socket connection when a message is received (by calling the socket writer within the subscriber callback).

My issue is that I cannot use async/await syntax for a subscriber callback since rospy.Subscriber() does not allow the callback to be awaited.

Ubuntu20 Ros Noetic Python 3.8.2

Here's a MWE:

A generic publisher, representing one of several nodes generating data I eventually want to send over the socket:

 #!/usr/bin/env python3

import rospy 
from std_msgs.msg import String

def sample_publisher():
    rospy.init_node('question_pub')
    pub = rospy.Publisher('topic_name', String, queue_size=10)
    rate = rospy.Rate(1)
    msg = String(data="sample message")

    rospy.loginfo("starting publish loop")
    while not rospy.is_shutdown():
        pub.publish(msg)
        rospy.loginfo("finished pub'in")
        rate.sleep()

if __name__ == '__main__':
    sample_publisher()

The subscriber in question:

#!/usr/bin/env python3


import rospy
from std_msgs.msg import String
import asyncio

class listener_handle:
    def __init__(self):
        rospy.init_node('question_sub')
        self.server_address = '127.0.0.1'
        self.port = 51222

    def activate_subscribers(self):
        #separated to ensure async writer is defined before first message
        rospy.Subscriber("topic_name", String, self.callback)

    async def connect2server(self):
        self.reader, self.writer = await asyncio.open_connection(self.server_address, self.port)

    async def async_spin(self):
        rospy.loginfo("beginning node spin")
        while not rospy.is_shutdown():
            #want bidirectional messages
            recvData = await self.reader.read(100)
            rospy.loginfo("got {} from socket server".format(recvData.decode()))

    async def callback(self, data):
        rospy.loginfo("callback received value of {}".format(data.data))
        self.writer.write(data.data.encode())
        await self.writer.drain()


async def main():
    h = listener_handle()
    await h.connect2server()
    h.activate_subscribers()
    await h.async_spin()

if __name__ == '__main__':
    asyncio.run(main())

which errors on startup with

/opt/ros/noetic/lib/python3/dist-packages/rospy/topics.py:750: RuntimeWarning: coroutine 'listener_handle.callback' was never awaited
  cb(msg)

and, for reference, the socket server I'm trying to connect to:

#!/usr/bin/env python3

import asyncio

async def handle_echo(reader, writer):
    print("client connected, waiting for msg")
    data = await reader.read(100)
    message = data.decode()

    print("Received {}".format(message))

    writer.write(data)
    #immediately send back to subscriber client
    await writer.drain()
    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 51222)

    print("server started up")
    async with server:
        await server.serve_forever()

asyncio.run(main())

I've attempted to make the subscriber asynchronous but it is not awaitable.

I know the subscribers themselves work asynchronously, but I looked for a few hours and was unable to find any examples using the asyncio library like this.

How can I use await within a rospy subscriber callback? Thanks in advance!

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
3

answered 2020-10-01 12:27:46 -0500

jdlangs gravatar image

You can't define the callback with async because rospy isn't set up to call callbacks as coroutines. What you can do instead is manually create an asynchronous task with create_task instead of await inside the callback. Importantly, callbacks are invoked by rospy in a new thread, so you will need to explicitly access the main event loop created by asyncio.run in the main thread. Probably in your object constructor you can store the output of asyncio.get_running_loop() as a member variable (e.g., self._event_loop). Then in your callback, use self._event_loop.create_task(self.writer.drain()) instead of await. This will be slightly different because the callback function won't wait for drain to complete before returning itself. I was unable to find a simple way to wait for a task to complete when not inside a coroutine, but there is probably some way.

Alternatively, because callback is running in a separate thread, you can actually just run asyncio.run(drain()), since there isn't an event loop yet running in that thread. This is probably not advisable though since you don't need to be creating a new event loop for every callback and there may be race issues with other coroutines running in the main event loop. But maybe it would work fine for your situation.

edit flag offensive delete link more

Comments

This approach appears to be working well with my full application (multiple callbacks with high data throughput). Thank you for the help!

chives_onion gravatar image chives_onion  ( 2020-10-02 12:29:17 -0500 )edit
0

answered 2021-10-20 13:27:36 -0500

mmoerdijk gravatar image

In the call back you should use run_coroutine_threadsafeSee: https://docs.python.org/3/library/asy...

edit flag offensive delete link more

Question Tools

2 followers

Stats

Asked: 2020-09-30 17:57:19 -0500

Seen: 4,535 times

Last updated: Oct 20 '21