Ask Your Question

Revision history [back]

In the case of Python, you can use get_msg_class from ros2topic.api. I am not sure if it is "the official way" to handle it, but here is an example:

import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class


class TestSubscriber(Node):
    def __init__(self):
        super().__init__('test_subscriber')
        message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
        print('Message type:', message_type)
        self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)

    def on_message_received(self, message):
        print(message)


def main(args=None):
    rclpy.init(args=args)
    test_subscriber = TestSubscriber()
    rclpy.spin(test_subscriber)
    test_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

You can test it by running for example:

ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'

In the case of Python, you can use get_msg_class from ros2topic.api. I am not sure if it is "the official way" to handle it, but here is an example:

import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class


class TestSubscriber(Node):
    def __init__(self):
        super().__init__('test_subscriber')
        message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
        print('Message type:', message_type)
        self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)

    def on_message_received(self, message):
        print(message)


def main(args=None):
    rclpy.init(args=args)
    test_subscriber = TestSubscriber()
    rclpy.spin(test_subscriber)
    test_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

You can test Test it by running for example:e.g.:

ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'

Not completely what are looking for, but here is one way to do in Python. In the case of Python, you can use get_msg_class from ros2topic.api. I am not sure if it is "the official way" to handle it, but here get the message type and then subscribe to the topic. Here is an example:

import rclpy
from rclpy.node import Node
from ros2topic.api import get_msg_class


class TestSubscriber(Node):
    def __init__(self):
        super().__init__('test_subscriber')
        message_type = get_msg_class(self, '/topic_name', include_hidden_topics=True)
        print('Message type:', message_type)
        self.create_subscription(message_type, '/topic_name', self.on_message_received, 1)

    def on_message_received(self, message):
        print(message)


def main(args=None):
    rclpy.init(args=args)
    test_subscriber = TestSubscriber()
    rclpy.spin(test_subscriber)
    test_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Test it by running e.g.:

ros2 topic pub /topic_name std_msgs/String '{ data: test_value }'