ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
11

rospy threading model

asked 2011-03-23 09:06:32 -0500

bhaskara gravatar image

Is it correct to assume that a new thread will be used for every instance of every subscription and service callback?

edit retag flag offensive close merge delete

3 Answers

Sort by ยป oldest newest most voted
8

answered 2011-03-24 05:45:32 -0500

kwc gravatar image

For a particular topic, there is only one thread that all the subscribers share.

Each service callback does get its own thread because there can only be one service callback for a particular service.

Future versions of rospy will likely have a different, more versatile threading model for subscriptions.

edit flag offensive delete link more

Comments

OK. For services, does it use a threadpool or spin up a new thread on each request?
bhaskara gravatar image bhaskara  ( 2011-03-24 06:29:49 -0500 )edit
Each service has a single, dedicated thread that handles the underlying transport and calling into the callback. Service requests are processed serially in the order they come off the socket.
kwc gravatar image kwc  ( 2011-03-24 06:32:47 -0500 )edit
Ken, did you mean 'for a particular node ( or maybe nodehandle?), there is only one thread...'? If I have a python node that subscribes to 5 topics how many threads are in play? What about on the publishing node's side?
Patrick Bouffard gravatar image Patrick Bouffard  ( 2011-03-24 08:29:51 -0500 )edit
3
For each subscribed topic in a node, there is a thread. 5 subscribed topics, 5 threads. If you have two subscriptions to one topic, one thread. Publishers are synchronous/blocking, so the publishing occurs in the same thread. (the i/o engine of rospy is overdue for a rewrite)
kwc gravatar image kwc  ( 2011-03-24 08:34:34 -0500 )edit
Thanks. What about the callbacks? I had the impression that those were executed serially from a single queue for all subscribers.
Patrick Bouffard gravatar image Patrick Bouffard  ( 2011-03-24 08:45:18 -0500 )edit
1
Yes, there is only one thread for a subscription, which includes the work of doing all the callbacks.
kwc gravatar image kwc  ( 2011-03-24 09:36:10 -0500 )edit
What @kwc is describing is the callbacks.
tfoote gravatar image tfoote  ( 2011-03-26 07:53:31 -0500 )edit

The detailed replies have been great, but just to eliminate all ambiguity, from what I understand this also means that my script will wait for my callback to end before starting another callback, even if a subscription releases a new message. Is this correct?

Seanny123 gravatar image Seanny123  ( 2013-12-09 15:23:08 -0500 )edit
8

answered 2018-04-18 14:32:40 -0500

zacwitte gravatar image

It actually seems like a subscriber process has one thread per topic subscribed per publisher. So if node A subscribes to topic /foo and node B publishes to topic /foo and node C also publishes to topic /foo, node A will have 2 callback threads.

ros_thread_sub.py

#!/usr/bin/env python

import threading

import rospy
from std_msgs.msg import String

def callback(msg):
    print("Msg from %s on thread %s" % (msg.data, threading.current_thread()))

def main():
    rospy.init_node('foooo')
    rospy.Subscriber('foo', String, callback, queue_size=1)
    rospy.spin()

if __name__ == '__main__':
    main()

ros_thread_pub.py (starting publisher B and C in separate terminals)

#!/usr/bin/env python

import threading
import sys

import rospy
from std_msgs.msg import String

global proc_id
proc_id = ""

def publisher():
    msg = String(proc_id)
    r = rospy.Rate(1)
    pub = rospy.Publisher('foo', String, queue_size=1)
    while not rospy.is_shutdown():
        print("publishing")
        pub.publish(msg)
        r.sleep()

def main():
    global proc_id
    proc_id = sys.argv[1]
    rospy.init_node(proc_id)
    thread = threading.Thread(target=publisher)
    thread.start()
    rospy.spin()

if __name__ == '__main__':
    main()

Console output:

Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from C on thread <Thread(/foo, started daemon 140237950301952)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from C on thread <Thread(/foo, started daemon 140237950301952)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
Msg from C on thread <Thread(/foo, started daemon 140237950301952)>
Msg from B on thread <Thread(/foo, started daemon 140237958694656)>
edit flag offensive delete link more
10

answered 2018-01-25 13:45:48 -0500

The accepted answer is not correct, at least with Kinetic. Consider the following:

#!/usr/bin/env python

import threading

import rospy
from std_msgs.msg import Empty

global i
i = 0


def callback(_):
    global i
    i += 1
    j = i
    while not rospy.is_shutdown():
        print('sleeping: %d, thread: %s' % (j, threading.current_thread()))
        rospy.Rate(1).sleep()
    print('exiting loop for %d: %s' % (j, rospy.is_shutdown()))


def main():
    rospy.init_node('foooo')
    sub = rospy.Subscriber('foo', Empty, callback, queue_size=1)


if __name__ == '__main__':
    main()
    rospy.spin()

Then, call rostopic twice by running the following command twice in parallel in different shell sessions:

rostopic pub /foo std_msgs/Empty "{}"

Observe the following output:

sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
sleeping: 1, thread: <Thread(/foo, started daemon 140177158719232)>
sleeping: 2, thread: <Thread(/foo, started daemon 140177150326528)>
edit flag offensive delete link more

Comments

1

You are right.

kevin.kuei.0321@gmail.com gravatar image kevin.kuei.0321@gmail.com  ( 2018-07-13 04:46:18 -0500 )edit

I don't quite understand why the observed output demonstrates that the accepted answer is incorrect. Of course time may have passed and those two comments are out of sync.

pitosalas gravatar image pitosalas  ( 2023-02-05 20:20:16 -0500 )edit

Question Tools

4 followers

Stats

Asked: 2011-03-23 09:06:32 -0500

Seen: 16,925 times

Last updated: Apr 18 '18