ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version
qos_profile = QoSProfile(
    reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
    history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
    depth=1
)

sub = Subscriber(
    self,
    Image,
    "rgb_img",
    qos_profile=qos_profile
)
sub.registerCallback(self._on_rgb)

import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

class MyNode(Node):
    def __init__(self):
        qos_profile = QoSProfile(
     reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
     history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
     depth=1
 )

 sub = Subscriber(
     self,
     Image,
     "rgb_img",
     qos_profile=qos_profile
 )
 sub.registerCallback(self._on_rgb)

    def _on_rgb(self, msg):
        ...

import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

class MyNode(Node):
    def __init__(self):
        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
            history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
            depth=1
        )

        sub = Subscriber(
            self,
            Image,
            "rgb_img",
            qos_profile=qos_profile
        )
        sub.registerCallback(self._on_rgb)

    def _on_rgb(self, msg):
        ...

from rclpy.node import Node from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

class MyNode(Node):
    def __init__(self):
        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
            history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
            depth=1
        )

        sub = Subscriber(
            self,
            Image,
            "rgb_img",
            qos_profile=qos_profile
        )
        sub.registerCallback(self._on_rgb)

    def _on_rgb(self, msg):
        ...

I've pull this from my code I use to get images from the RS camera (which requires a non-default QoS policy).

from rclpy.node import Node
 from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

QoSHistoryPolicy

class MyNode(Node):
    def __init__(self):
        qos_profile = QoSProfile(
            reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
            history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
            depth=1
        )

        sub = Subscriber(
            self,
            Image,
            "rgb_img",
            qos_profile=qos_profile
        )
        sub.registerCallback(self._on_rgb)

    def _on_rgb(self, msg):
        ...