ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question
1

Does rospy have a get_subscribed_topics() function?

asked 2011-11-09 10:04:31 -0500

updated 2011-11-09 10:04:43 -0500

rospy has rospy.get_published_topics() but doesn't seem to have the analogous function for subscribed topics. Am I missing something?

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
1

answered 2013-01-15 14:29:26 -0500

amittleider gravatar image

updated 2013-01-15 14:48:31 -0500

This code should help accomplish what you want. It will simply print out the entire ROSGraph nodes and edges. You can infer subscribers from here.

 #!/usr/bin/env python import roslib; 
 roslib.load_manifest('ssnode') import 
 rospy import roslib 
 import roslib.message
 import rosgraph
 import rostopic 
 from rostopic import ROSTopicHz 
 from rosgraph.impl import graph

 def mygraph():
     g = graph.Graph()
     try:
         while 1:
           g.update()

           if not g.nn_nodes and not g.srvs:
               print("empty")
           else:
               print('\n')
           if g.nn_nodes:
               print('Nodes:')
               for n in g.nn_nodes:
                   prefix = n + '|'
                   print('  ' + n + ' :')
                   print('    Inbound:')
                   for k in g.nn_edges.edges_by_end.iterkeys():
                       if k.startswith(prefix):
                           for c in g.nn_edges.edges_by_end[k]:
                               print('      ' + c.start + ' label = ' + c.label)
                   print('    Outbound:')
                   for k in g.nn_edges.edges_by_start.iterkeys():
                       if k.startswith(prefix):
                           for c in g.nn_edges.edges_by_start[k]:
                               print('      ' + c.end + ' label = ' + c.label)
           if g.srvs:
               print('Services:')
               for s in g.srvs:
                   print('  ' + s)
     except KeyboardInterrupt:
         pass
if __name__ == '__main__':
    try:
        mygraph()
    except rospy.ROSInterruptException: pass
edit flag offensive delete link more
0

answered 2021-09-02 06:04:10 -0500

heuristicus gravatar image

updated 2021-09-02 06:05:54 -0500

An alternative way of doing this without having to look at the graph is to use

    from rosgraph.masterapi import Master
    master = Master("/my_node")
    all_topics = [topic_tuple[0] for topic_tuple in master.getTopicTypes()]
    published_topics = [topic_tuple[0] for topic_tuple in master.getPublishedTopics("/")]
    subscribed_topics = list(set(all_topics) - set(published_topics))
edit flag offensive delete link more

Question Tools

Stats

Asked: 2011-11-09 10:04:31 -0500

Seen: 554 times

Last updated: Sep 02 '21