ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
import rospy import sys
from importlib import import_module
class Listener(object):
def __init__(self):
self._binary_sub = rospy.Subscriber(
'some_topic', rospy.AnyMsg, self.topic_callback)
def topic_callback(self, data):
assert sys.version_info >= (2,7) #import_module's syntax needs 2.7
connection_header = data._connection_header['type'].split('/')
ros_pkg = connection_header[0] + '.msg'
msg_type = connection_header[1]
print 'Message type detected as ' + msg_type
msg_class = getattr(import_module(ros_pkg), msg_type)
msg = msg_class().deserialize(self._buff)
print data.known_field
Based on article http://schulz-m.github.io/2016/07/18/rospy-subscribe-to-any-msg-type/
2 | No.2 Revision |
import rospy import sys
from importlib import import_module
class Listener(object):
def __init__(self):
self._binary_sub = rospy.Subscriber(
'some_topic', rospy.AnyMsg, self.topic_callback)
def topic_callback(self, data):
assert sys.version_info >= (2,7) #import_module's syntax needs 2.7
connection_header = data._connection_header['type'].split('/')
ros_pkg = connection_header[0] + '.msg'
msg_type = connection_header[1]
print 'Message type detected as ' + msg_type
msg_class = getattr(import_module(ros_pkg), msg_type)
msg = msg_class().deserialize(self._buff)
msg_class().deserialize(data._buff)
print data.known_field
Based on article http://schulz-m.github.io/2016/07/18/rospy-subscribe-to-any-msg-type/