ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
0

rospy callback subscribe to topics in parallel

asked 2017-05-29 15:47:45 -0500

Mitchell gravatar image

updated 2017-06-01 13:00:00 -0500

I am trying to run a system where my ROS code subscribes to 'n' different topics, each representing one agent. For each agent I need to run a series of calculations (based on the callback function) which I then publish later for a different node to handle. The problem is that the calculations I need to do take some time, and I need the system to run as quickly as possible. What I would like to have happen is for my script to subscribe to each agent and run this calculation parallel (or pretty close to parallel). Here is my pseudo-code:

#!/usr/bin/env python
import numpy as np
from numpy import linalg as LA
from multi_agent_simulation.msg import IndividualPositions, Contains, EntOcc
import rospy
from occupancy_entropy_plot import Aggent_Occupancy_Entropy
import sys
import time
from matplotlib import path
import matplotlib.pyplot as plt

class contains:
    def __init__(self):

        '''
        Initalize Data
        '''

        self.AOE = Aggent_Occupancy_Entropy()
        self.agents = self.AOE.agents
        self.x_min_dist = self.AOE.x_min_dist #distribtuion limits
        self.x_max_dist = self.AOE.x_max_dist
        self.y_min_dist = self.AOE.y_min_dist
        self.y_max_dist = self.AOE.y_max_dist
        self.delta = self.AOE.delta
        self.all_positions_tuple = self.AOE.all_positions_tuple
        self.center_dict = {}
        self.dim_x = 0
        self.dim_y = 0
        self.ent_list = [0]*self.agents
        for i in range(0, self.agents):
            name = 'agent%s' % (i + 1)
            self.center_dict[name] = 0.0
        '''
        Subscribers and publisher
        '''
        self.agent_dictionary = {}
        self.contains_dict = {}
        for i in range(0, self.agents):
            name = 'agent%s' % (i + 1)
            self.agent_dictionary[name] = [0, 0]
            rospy.Subscriber('/' + name, IndividualPositions, self.individual_callback, queue_size=1, buff_size=2**24)             

    def individual_callback(self, msg):
        '''
        DO CALCULATION
        '''




def main():
    while not rospy.is_shutdown():
         '''
         DO STUFF/PUBLISH DATA
         '''


if __name__ == '__main__':
    rospy.init_node('contains')
    C = contains()
    C.main()

So the idea is that the each agent's information would pass through the individual_callback function where I do my calculations, and then the function main() actually publishes it. If I use this code for 1 agent, I can run my calculation at ~20Hz which is perfectly acceptable. But with more agents, say around 20, the speed drops to ~1Hz which is too slow. How can I reorganize my code so that the processing is more parallel? Is that even possible?

Note: I have looked at Python multiprocessing, that was my initial idea. But there seems to be issues using multiprocessing and ROS to calculate and publish data. That was the case for me, anyway.

EDIT: Based on maxsvetlik's suggestion I rewrote the node so that each topic was passed through it's own callback function. Unfortantely, this did not work. Updated code is:

#!/usr/bin/env python

import numpy as np
from numpy import linalg as LA
from multi_agent_simulation.msg import IndividualPositions, Contains, EntOcc
import rospy
from occupancy_entropy_plot import Aggent_Occupancy_Entropy
import sys
import time
from matplotlib import path
import matplotlib.pyplot as plt

class contains:
    def __init__(self):

        '''
        Initalize Data
        '''
        #self.fig1 = plt.figure(1)
        #self.fig1.show()
        self.ent_time = 0.0
        self.dim_x = 0
        self.dim_y = 0
        self.dim_x = 0
        self.dim_y = 0
        for i in range(0 ...
(more)
edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
0

answered 2017-05-30 11:11:16 -0500

updated 2017-05-31 10:36:49 -0500

When you subscribe to a topic with rospy the callback is given its own thread automatically to process the data from the topic. What seems to be happening here is that all threads are given a single method to do their computation (individual_callback()), forcing each thread to compete for time. This is why you see the roughly linear relationship between calculation speed and number of subscribed agents.

So the parallelism is already happening, but its being counter acted by having only a single processing callback. The solution is to use a unique callback for each subscriber. E.g.

class contains:
def __init__(self):

    self.agent_msgs = [ ] #for speed, this should by a numpy array
    for i in range(0, self.agents):
        name = 'agent%s' % (i + 1)
        self.center_dict[name] = 0.0

def agent_cb0(msg):
      self.agent_msgs[0] = msg

def agent_cb1(msg):
      self.agent_msgs[1] = msg

 ....

 def setup():
    '''
    Subscribers and publisher
    '''
    self.agent_dictionary = {}
    self.contains_dict = {}
    rospy.Subscriber('/agent0', IndividualPositions, self.agent_cb0, queue_size=1)
    rospy.Subscriber('/agent1', IndividualPositions, self.agent_cb1, queue_size=1)
    ...
edit flag offensive delete link more

Comments

Thanks for responding. Unfortunately, that did not work for me. I edited the question above with how I re-wrote the node.

Mitchell gravatar image Mitchell  ( 2017-05-30 12:44:39 -0500 )edit

I'm unfamiliar with how you're generating functions here, and it seems possible to me that the function is just being overwritten, mapping every agent to the same callback (as before). I've updated my answer with code using standalone callbacks. Please try this out and see if it scales

maxsvetlik gravatar image maxsvetlik  ( 2017-05-31 10:39:22 -0500 )edit

Oh yeah I've tried it this way before, the performance is identical.

Mitchell gravatar image Mitchell  ( 2017-06-01 16:14:22 -0500 )edit
0

answered 2017-05-31 14:34:05 -0500

NEngelhard gravatar image

updated 2017-05-31 14:46:51 -0500

You should separate the callback from the processing. You could for example just let the callback fill a queue with the jobs that are received on the different topics and have a number of threads look for jobs that they can process. I wrote a small example, that receives tasks on data%i-topics, and has a number of threads that do some processing. New tasks can be created via

rostopic pub /data2 std_msgs/Empty "{}" --rate 20

and the result will be shown on /processed

#! /usr/bin/python

import rospy
from std_msgs.msg import UInt16, Empty
import Queue
from thread import start_new_thread
from threading import Lock

class ThreadClient:
    def __init__(self):
        self.q = Queue.Queue() # input queue for jobs

        self.new_job_id = 0
        self.job_id_lock = Lock()
        self.pub = rospy.Publisher("/processed", UInt16, queue_size=100)

        # collect several input topics in single callback
        for i in range(3):
            self.sub = rospy.Subscriber("/data"+str(i), Empty, self.collect_cb)

        # create some worker thread that process in incoming data in parallel
        for thread_name in range(10):
            start_new_thread(self.worker, (thread_name, ))


    def worker(self, thread_id):
        rospy.loginfo("Starting thread %i", thread_id)
        while not rospy.is_shutdown():
            try:
                task = self.q.get(block=False)  # Queue is already thread safe
                rospy.loginfo("Thread %i is Processing Task %i", thread_id, task)
                rospy.sleep(1) # do some fancy computation...
                self.pub.publish(UInt16(data=task))  # uses internal queue of publisher
            except Queue.Empty:
                rospy.sleep(0.1)

    def collect_cb(self, msg):  # create a new ID for this task
        assert isinstance(msg, Empty)
        with self.job_id_lock:
            self.new_job_id += 1
            current_id = self.new_job_id

        self.q.put(current_id)
        rospy.loginfo("Added task %i to the queue", current_id)


if __name__ == "__main__":
    rospy.init_node("subs")
    tc = ThreadClient()
    rospy.spin()
edit flag offensive delete link more

Comments

Thanks for answering, but that unfortunately has not worked for me yet. See Edit2.

Mitchell gravatar image Mitchell  ( 2017-06-01 13:00:18 -0500 )edit

You of course have to adapt the worker-function. In the callback, you only(!) put the task in the queue.

NEngelhard gravatar image NEngelhard  ( 2017-06-01 13:23:04 -0500 )edit

Yes I know, this is just a practice code. What I am talking about is simply how fast the callback can actually run. So the process of grabbing the String message and running those commands shown in collect_cb decreases with the number of subscribers.

Mitchell gravatar image Mitchell  ( 2017-06-01 14:12:07 -0500 )edit

But why do run any calculation in the callback?

NEngelhard gravatar image NEngelhard  ( 2017-06-01 14:17:25 -0500 )edit

I guess I was thinking that it would be better to run the calculation within the callback. I adjusted my code and ran the computation in the worker, but this still results in slowed frequencies.

Mitchell gravatar image Mitchell  ( 2017-06-01 14:28:29 -0500 )edit

So the reason this method is not working for me is the computational time gets added per topic used. The rate of the '/processed' topic does not vary when more topics are added, but the rate of each worker thread decreases, and that is the problem.

Mitchell gravatar image Mitchell  ( 2017-06-01 17:41:11 -0500 )edit

Question Tools

1 follower

Stats

Asked: 2017-05-29 15:47:45 -0500

Seen: 3,586 times

Last updated: Jun 01 '17