ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange |
1 | initial version |
In the case of Python, you can use get_msg_class
from ros2topic.api
. I am not sure if it is "the official way" to handle it, but here is an example:
import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class
class TestSubscriber(Node):
def __init__(self):
super().__init__('test_subscriber')
message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
print('Message type:', message_type)
self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)
def on_message_received(self, message):
print(message)
def main(args=None):
rclpy.init(args=args)
test_subscriber = TestSubscriber()
rclpy.spin(test_subscriber)
test_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
You can test it by running for example:
ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'
2 | No.2 Revision |
In the case of Python, you can use get_msg_class
from ros2topic.api
. I am not sure if it is "the official way" to handle it, but here is an example:
import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class
class TestSubscriber(Node):
def __init__(self):
super().__init__('test_subscriber')
message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
print('Message type:', message_type)
self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)
def on_message_received(self, message):
print(message)
def main(args=None):
rclpy.init(args=args)
test_subscriber = TestSubscriber()
rclpy.spin(test_subscriber)
test_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
You can test Test it by running for example:e.g.:
ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'
3 | No.3 Revision |
Not completely what are looking for, but here is one way to do in Python. In the case of Python, you can use get_msg_class
from ros2topic.api
. I am not sure if it is "the official way" to handle it, but here get the message type and then subscribe to the topic. Here is an example:
import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class
class TestSubscriber(Node):
def __init__(self):
super().__init__('test_subscriber')
message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
print('Message type:', message_type)
self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)
def on_message_received(self, message):
print(message)
def main(args=None):
rclpy.init(args=args)
test_subscriber = TestSubscriber()
rclpy.spin(test_subscriber)
test_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Test it by running e.g.:
ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'