ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20 and the result will be shown on /processed

! /usr/bin/python

import rospy from std_msgs.msg import UInt16, Empty import Queue from thread import start_new_thread from threading import Lock

class ThreadClient: def __init__(self): # input queue for jobs self.q = Queue.Queue()

    # collect several input topics in single callback
    for i in range(3):
        self.sub = rospy.Subscriber("/data"+str(i),

Empty, self.collect_cb)

    # create some worker thread that process in incoming data in

parallel for thread_name in range(3): start_new_thread(self.worker, (thread_name, ))

    self.new_job_id = 0
    self.job_id_lock = Lock()
    self.pub = rospy.Publisher("/processed", UInt16,

queue_size=100)

def worker(self, thread_id):
    rospy.loginfo("Starting thread %i", thread_id)
    while not rospy.is_shutdown():
        try:
            task = self.q.get(block=False)
            rospy.loginfo("Thread %i is Processing Task %i", thread_id,

task) # do some fancy computation rospy.sleep(1) # use internal queue of publisher (or create new Queue and have a publisher thread) self.pub.publish(UInt16(data=task)) except Queue.Empty: # rospy.loginfo("Nothing to do for thread %i", thread_id) rospy.sleep(0.1)

def collect_cb(self, msg):  # create a new ID for this task
    assert isinstance(msg, Empty)
    self.job_id_lock.acquire()  # not really needed: make sure that

job_ids stay unique next_id = self.new_job_id+1 self.new_job_id += 1 self.job_id_lock.release()

    self.q.put(next_id)  # uses internal lock
    rospy.loginfo("Added task %i to the queue", next_id)

if __name__ == "__main__": rospy.init_node("subs") tc = ThreadClient() rospy.spin()

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20
 

and the result will be shown on /processed

! /usr/bin/python

#! /usr/bin/python

import rospy rospy
from std_msgs.msg import
  import UInt16, Empty Empty
import Queue Queue
from thread
  thread import start_new_thread start_new_thread
from threading
  threading import Lock

Lock class ThreadClient: def __init__(self): # input queue for jobs self.q = Queue.Queue()

Queue.Queue()

        # collect several input topics in single callback
     for i in range(3):
         self.sub = rospy.Subscriber("/data"+str(i),

rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)

self.collect_cb)

        # create some worker thread that process in incoming data in

in parallel for thread_name in range(3): start_new_thread(self.worker, start_new_thread(self.worker, (thread_name, ))

))

        self.new_job_id = 0
     self.job_id_lock = Lock()
     self.pub = rospy.Publisher("/processed", UInt16,

queue_size=100)

UInt16, queue_size=100)

    def worker(self, thread_id):
     rospy.loginfo("Starting thread %i", thread_id)
     while not rospy.is_shutdown():
         try:
             task = self.q.get(block=False)
             rospy.loginfo("Thread %i is Processing Task %i", thread_id,

thread_id, task) # do some fancy computation rospy.sleep(1) # use internal queue of publisher (or create new Queue and and have a publisher thread) self.pub.publish(UInt16(data=task)) except Queue.Empty: # rospy.loginfo("Nothing to do for for thread %i", thread_id) rospy.sleep(0.1)

rospy.sleep(0.1)

    def collect_cb(self, msg):  # create a new ID for this task
     assert isinstance(msg, Empty)
     self.job_id_lock.acquire()  # not really needed: make sure that

that job_ids stay unique next_id = self.new_job_id+1 self.new_job_id += 1 self.job_id_lock.release()

self.job_id_lock.release()

        self.q.put(next_id)  # uses internal lock
     rospy.loginfo("Added task %i to the queue", next_id)

if __name__ == "__main__": rospy.init_node("subs") tc = ThreadClient() rospy.spin()

rospy.spin()

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20

and the result will be shown on /processed

#! /usr/bin/python

import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock


class ThreadClient:
    def __init__(self):
        self.q = Queue.Queue() # input queue for jobs
        self.q = Queue.Queue()

        # collect several input topics in single callback
        for i in range(3):
            self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)

        # create some worker thread that process in incoming data in parallel
        for thread_name in range(3):
range(10):
            start_new_thread(self.worker, (thread_name, ))

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

    def worker(self, thread_id):
        rospy.loginfo("Starting thread %i", thread_id)
        while not rospy.is_shutdown():
            try:
                task = self.q.get(block=False)
self.q.get(block=False)  # Queue is already thread safe
                rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
                rospy.sleep(1) # do some fancy computation
                rospy.sleep(1)
                # use computation...
                self.pub.publish(UInt16(data=task))  # uses internal queue of publisher (or create new Queue and have a publisher thread)
                self.pub.publish(UInt16(data=task))
publisher
            except Queue.Empty:
                # rospy.loginfo("Nothing to do for thread %i", thread_id)
                rospy.sleep(0.1)

    def collect_cb(self, msg):  # create a new ID for this task
        assert isinstance(msg, Empty)
        self.job_id_lock.acquire()  # not really needed: make sure that job_ids stay unique
        next_id = self.new_job_id+1
with self.job_id_lock:
            self.new_job_id += 1
        self.job_id_lock.release()

        self.q.put(next_id)  # uses internal lock
    current_id = self.new_job_id

        self.q.put(current_id)
        rospy.loginfo("Added task %i to the queue", next_id)
current_id)


if __name__ == "__main__":
    rospy.init_node("subs")
    tc = ThreadClient()
    rospy.spin()

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20

and the result will be shown on /processed

#! /usr/bin/python

import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock

 class ThreadClient:
    def __init__(self):
        self.q = Queue.Queue() # input queue for jobs

        # collect several input topics in single callback
        for i in range(3):
            self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

       # create some worker thread that process in incoming data in parallel
        for thread_name in range(10):
            start_new_thread(self.worker, (thread_name, ))

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

    def worker(self, thread_id):
        rospy.loginfo("Starting thread %i", thread_id)
        while not rospy.is_shutdown():
            try:
                task = self.q.get(block=False)  # Queue is already thread safe
                rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
                rospy.sleep(1) # do some fancy computation...
                self.pub.publish(UInt16(data=task))  # uses internal queue of publisher
            except Queue.Empty:
                rospy.sleep(0.1)

    def collect_cb(self, msg):  # create a new ID for this task
        assert isinstance(msg, Empty)
        with self.job_id_lock:
            self.new_job_id += 1
            current_id = self.new_job_id

        self.q.put(current_id)
        rospy.loginfo("Added task %i to the queue", current_id)


if __name__ == "__main__":
    rospy.init_node("subs")
    tc = ThreadClient()
    rospy.spin()

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20

and the result will be shown on /processed

#! /usr/bin/python

import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock

class ThreadClient:
    def __init__(self):
        self.q = Queue.Queue() # input queue for jobs

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

        # collect several input topics in single callback
        for i in range(3):
            self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

       # create some worker thread that process in incoming data in parallel
        for thread_name in range(10):
            start_new_thread(self.worker, (thread_name, ))


    def worker(self, thread_id):
        rospy.loginfo("Starting thread %i", thread_id)
        while not rospy.is_shutdown():
            try:
                task = self.q.get(block=False)  # Queue is already thread safe
                rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
                rospy.sleep(1) # do some fancy computation...
                self.pub.publish(UInt16(data=task))  # uses internal queue of publisher
            except Queue.Empty:
                rospy.sleep(0.1)

    def collect_cb(self, msg):  # create a new ID for this task
        assert isinstance(msg, Empty)
        with self.job_id_lock:
            self.new_job_id += 1
            current_id = self.new_job_id

        self.q.put(current_id)
        rospy.loginfo("Added task %i to the queue", current_id)


if __name__ == "__main__":
    rospy.init_node("subs")
    tc = ThreadClient()
    rospy.spin()