ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via
rostopic pub /data2 std_msgs/Empty "{}" --rate 20 and the result will be shown on /processed
! /usr/bin/python
import rospy from std_msgs.msg import UInt16, Empty import Queue from thread import start_new_thread from threading import Lock
class ThreadClient: def __init__(self): # input queue for jobs self.q = Queue.Queue()
# collect several input topics in single callback for i in range(3): self.sub = rospy.Subscriber("/data"+str(i),
Empty, self.collect_cb)
# create some worker thread that process in incoming data in
parallel for thread_name in range(3): start_new_thread(self.worker, (thread_name, ))
self.new_job_id = 0 self.job_id_lock = Lock() self.pub = rospy.Publisher("/processed", UInt16,
queue_size=100)
def worker(self, thread_id): rospy.loginfo("Starting thread %i", thread_id) while not rospy.is_shutdown(): try: task = self.q.get(block=False) rospy.loginfo("Thread %i is Processing Task %i", thread_id,
task) # do some fancy computation rospy.sleep(1) # use internal queue of publisher (or create new Queue and have a publisher thread) self.pub.publish(UInt16(data=task)) except Queue.Empty: # rospy.loginfo("Nothing to do for thread %i", thread_id) rospy.sleep(0.1)
def collect_cb(self, msg): # create a new ID for this task assert isinstance(msg, Empty) self.job_id_lock.acquire() # not really needed: make sure that
job_ids stay unique next_id = self.new_job_id+1 self.new_job_id += 1 self.job_id_lock.release()
self.q.put(next_id) # uses internal lock rospy.loginfo("Added task %i to the queue", next_id)
if __name__ == "__main__": rospy.init_node("subs") tc = ThreadClient() rospy.spin()
2 | No.2 Revision |
You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via
rostopic pub /data2 std_msgs/Empty "{}" --rate 20
#! /usr/bin/python
import
queue_size=100)
3 | No.3 Revision |
You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via
rostopic pub /data2 std_msgs/Empty "{}" --rate 20
and the result will be shown on /processed
#! /usr/bin/python
import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock
class ThreadClient:
def __init__(self):
self.q = Queue.Queue() # input queue for jobs
self.q = Queue.Queue()
# collect several input topics in single callback
for i in range(3):
self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)
# create some worker thread that process in incoming data in parallel
for thread_name in range(3):
range(10):
start_new_thread(self.worker, (thread_name, ))
self.new_job_id = 0
self.job_id_lock = Lock()
self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)
def worker(self, thread_id):
rospy.loginfo("Starting thread %i", thread_id)
while not rospy.is_shutdown():
try:
task = self.q.get(block=False)
self.q.get(block=False) # Queue is already thread safe
rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
rospy.sleep(1) # do some fancy computation
rospy.sleep(1)
# use computation...
self.pub.publish(UInt16(data=task)) # uses internal queue of publisher (or create new Queue and have a publisher thread)
self.pub.publish(UInt16(data=task))
publisher
except Queue.Empty:
# rospy.loginfo("Nothing to do for thread %i", thread_id)
rospy.sleep(0.1)
def collect_cb(self, msg): # create a new ID for this task
assert isinstance(msg, Empty)
self.job_id_lock.acquire() # not really needed: make sure that job_ids stay unique
next_id = self.new_job_id+1
with self.job_id_lock:
self.new_job_id += 1
self.job_id_lock.release()
self.q.put(next_id) # uses internal lock
current_id = self.new_job_id
self.q.put(current_id)
rospy.loginfo("Added task %i to the queue", next_id)
current_id)
if __name__ == "__main__":
rospy.init_node("subs")
tc = ThreadClient()
rospy.spin()
4 | No.4 Revision |
You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via
rostopic pub /data2 std_msgs/Empty "{}" --rate 20
and the result will be shown on /processed
#! /usr/bin/python
import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock
class ThreadClient:
def __init__(self):
self.q = Queue.Queue() # input queue for jobs
# collect several input topics in single callback
for i in range(3):
self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)
self.new_job_id = 0
self.job_id_lock = Lock()
self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)
# create some worker thread that process in incoming data in parallel
for thread_name in range(10):
start_new_thread(self.worker, (thread_name, ))
self.new_job_id = 0
self.job_id_lock = Lock()
self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)
def worker(self, thread_id):
rospy.loginfo("Starting thread %i", thread_id)
while not rospy.is_shutdown():
try:
task = self.q.get(block=False) # Queue is already thread safe
rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
rospy.sleep(1) # do some fancy computation...
self.pub.publish(UInt16(data=task)) # uses internal queue of publisher
except Queue.Empty:
rospy.sleep(0.1)
def collect_cb(self, msg): # create a new ID for this task
assert isinstance(msg, Empty)
with self.job_id_lock:
self.new_job_id += 1
current_id = self.new_job_id
self.q.put(current_id)
rospy.loginfo("Added task %i to the queue", current_id)
if __name__ == "__main__":
rospy.init_node("subs")
tc = ThreadClient()
rospy.spin()
5 | No.5 Revision |
You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via
rostopic pub /data2 std_msgs/Empty "{}" --rate 20
and the result will be shown on /processed
#! /usr/bin/python
import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock
class ThreadClient:
def __init__(self):
self.q = Queue.Queue() # input queue for jobs
self.new_job_id = 0
self.job_id_lock = Lock()
self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)
# collect several input topics in single callback
for i in range(3):
self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)
self.new_job_id = 0
self.job_id_lock = Lock()
self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)
# create some worker thread that process in incoming data in parallel
for thread_name in range(10):
start_new_thread(self.worker, (thread_name, ))
def worker(self, thread_id):
rospy.loginfo("Starting thread %i", thread_id)
while not rospy.is_shutdown():
try:
task = self.q.get(block=False) # Queue is already thread safe
rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
rospy.sleep(1) # do some fancy computation...
self.pub.publish(UInt16(data=task)) # uses internal queue of publisher
except Queue.Empty:
rospy.sleep(0.1)
def collect_cb(self, msg): # create a new ID for this task
assert isinstance(msg, Empty)
with self.job_id_lock:
self.new_job_id += 1
current_id = self.new_job_id
self.q.put(current_id)
rospy.loginfo("Added task %i to the queue", current_id)
if __name__ == "__main__":
rospy.init_node("subs")
tc = ThreadClient()
rospy.spin()