ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

To answer my own question... It seems plugins need to be written in C++ as nav2 uses pluginlib where classes are dynamically loaded. You have to inherit from e.g. nav2_core Controller base class, and override all the methods such as activateetc.

However, there is no reason why the C++ plugin can't be a thin wrapper that calls embedded Python, or even delegates to a separate Python node via a ROS service/action interface.

To answer my own question... It seems plugins need to be written in C++ as nav2 uses pluginlib where classes are dynamically loaded. You have to inherit from e.g. nav2_core Controller base class, and override all the methods such as activateetc.

However, there is no reason why the C++ plugin can't be a thin wrapper that calls embedded Python, or even delegates to a separate Python node via a ROS service/action interface.


As requested, here is some code that used to work in Eloquent, but no longer works with Foxy. When I get chance I'll get it working in Foxy and publish the code properly in a git repo.

This is a Python node that will provide a service to compute the cmdvel:

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from <your_package>.srv import CalcCmdVel
from my_ros_utils_py.utils import (
    pose2d_to_posestamped,
    signed_angle_difference,
    distance_squared,
    distance,
    bearing_from
)

DEFAULT_NODE_NAME = 'PCS'
CMDVEL_SERVICE_NAME = "CalcCmdVel"
TARGET_POSE_TOPIC = "target_pose"
LINEAR_PROPORTIONAL_GAIN_PARAM = "linear_velocity_proportional_gain"
ANGULAR_PROPORTIONAL_GAIN_PARAM = "angular_velocity_proportional_gain"  
DEFAULT_LINEAR_PROPORTIONAL_GAIN = 1.0
DEFAULT_ANGULAR_PROPORTIONAL_GAIN = 0.5
LINEAR_MAX_ERROR_PARAM = "linear_max_error"
ANGULAR_MAX_ERROR_PARAM = "angular_max_error"
DEFAULT_LINEAR_MAX_ERROR = 1.0
DEFAULT_ANGULAR_MAX_ERROR = 3.14
LINEAR_DEAD_BAND_PARAM = "linear_dead_band"
ANGULAR_DEAD_BAND_PARAM = "angular_dead_band"
DEFAULT_LINEAR_DEAD_BAND = 0.0
DEFAULT_ANGULAR_DEAD_BAND = 0.0
SPOT_TURN_THRESHOLD_PARAM = "spot_turn_threshold"
DEFAULT_SPOT_TURN_THESHOLD = 1.0
LOOKAHEAD_PARAM = "lookahead"
DEFAULT_LOOKAHEAD = 0.2


class PCS(Node):
    """Proportional Controller Service.

    Provide cmd_vel computation service.
    """

    def __init__(self) -> None:
        super().__init__(DEFAULT_NODE_NAME)
        self._logger = self.get_logger()

        self.declare_parameters(
            namespace='',
            parameters=[
                (LINEAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_LINEAR_PROPORTIONAL_GAIN),
                (ANGULAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_ANGULAR_PROPORTIONAL_GAIN),
                (LINEAR_MAX_ERROR_PARAM, DEFAULT_LINEAR_MAX_ERROR),
                (ANGULAR_MAX_ERROR_PARAM, DEFAULT_ANGULAR_MAX_ERROR),
                (LINEAR_DEAD_BAND_PARAM, DEFAULT_LINEAR_DEAD_BAND),
                (ANGULAR_DEAD_BAND_PARAM, DEFAULT_ANGULAR_DEAD_BAND),
                (SPOT_TURN_THRESHOLD_PARAM, DEFAULT_SPOT_TURN_THESHOLD),
                (LOOKAHEAD_PARAM, DEFAULT_LOOKAHEAD)
            ]
        )
        self._linear_gain = self.get_parameter(
            LINEAR_PROPORTIONAL_GAIN_PARAM
        ).get_parameter_value().double_value
        self._angular_gain = self.get_parameter(
            ANGULAR_PROPORTIONAL_GAIN_PARAM
        ).get_parameter_value().double_value
        self._linear_max_error = self.get_parameter(
            LINEAR_MAX_ERROR_PARAM
        ).get_parameter_value().double_value
        self._angular_max_error = self.get_parameter(
            ANGULAR_MAX_ERROR_PARAM
        ).get_parameter_value().double_value
        self._linear_dead_band = self.get_parameter(
            LINEAR_DEAD_BAND_PARAM
        ).get_parameter_value().double_value
        self._angular_dead_band = self.get_parameter(
            ANGULAR_DEAD_BAND_PARAM
        ).get_parameter_value().double_value
        self._spot_turn_threshold = self.get_parameter(
            SPOT_TURN_THRESHOLD_PARAM
        ).get_parameter_value().double_value
        self._lookahead = self.get_parameter(
            LOOKAHEAD_PARAM
        ).get_parameter_value().double_value

        self._cmd_vel_srv = self.create_service(
            CalcCmdVel,
            CMDVEL_SERVICE_NAME,
            self.calc_cmd_vel
        )

        self._target_pub = self.create_publisher(
            PoseStamped,
            TARGET_POSE_TOPIC,
            1
        )

        self._logger.info(
            "Proportional controller service initialised."
        )

    def calc_cmd_vel(self, request, response):
        """Compute the velocity command.

        Args:
            request: see cc_interface/srv/CalcCmdVel

        Returns:
            response: see cc_interface/srv/CalcCmdVel
        """
        path = request.path
        pose = request.pose

        response.cmd_vel.header.stamp = self.get_clock().now().to_msg()

        # Ensure path and pose are in same reference frame
        if path.header.frame_id != pose.header.frame_id:
            self._logger.error(
                "Path frame (\"%s\") not equal to pose (\"%s\")",
                path.header.frame_id,
                pose.header.frame_id
            )
            response.cmd_vel.velocity.x = 0.0
            response.cmd_vel.velocity.y = 0.0
            response.cmd_vel.velocity.theta = 0.0
            return response

        target_pose = self.get_target_pose(path)

        # Angular speed control
        target_theta = bearing_from(pose.pose, target_pose)
        ctrl_w, err_w = self.calc_ctrl(
            pose.pose.theta,
            target_theta,
            self.angular_error,
            self._angular_gain,
            self._angular_dead_band,
            self._angular_max_error
        )

        # Linear speed control
        dist = 0.0
        target_dist = 0.0
        if abs(err_w) < self._spot_turn_threshold:
            dist = distance(pose.pose, target_pose)
        ctrl_v, err_v = self.calc_ctrl(
            dist,
            target_dist,
            self.linear_error,
            self._linear_gain,
            self._linear_dead_band,
            self._linear_max_error
        )

        # Publish target
        target_pose.theta = target_theta
        self._target_pub.publish(
            pose2d_to_posestamped(
                target_pose,
                pose.header
            )
        )

        # Controller is feeding back on *distance* so will output a negative
        # control value to reduce +ve dist to target_dist (0).
        # We are controlling velocity, so will want to increase speed
        # to decrease distance hence negative sign on ctrl_v
        response.cmd_vel.velocity.x = -ctrl_v  # Forward is along x
        response.cmd_vel.velocity.y = 0.0
        response.cmd_vel.velocity.theta = ctrl_w

        return response

    def get_target_pose(self, path):
        """Get target pose to head for on path.

        Args:
            path: nav_2d_msgs/Path2D

        Returns:
            target: geometry_msgs/Pose2D
        """
        start_pose = path.poses[0]
        target = next(
            (pose for pose in path.poses if distance_squared(pose, start_pose) > self._lookahead),
            start_pose
        )
        return target

    def calc_error(
        self,
        value,
        set_point,
        error_function,
        dead_band=0.0,
        max_error=float('inf')
    ):
        """Calculate error using error function pointer,
        applying clipping and dead_band filtering to result

        Args:
            value: float
            set_point: float
            error_function: function pointer
            dead_band: float
            max_error: float

        Returns:
            error: float
        """
        error = error_function(value, set_point)
        abs_err = abs(error)
        max_error = abs(max_error)

        if abs_err <= dead_band:
            return 0.0
        elif abs_err >= max_error:
            if error > 0:
                return max_error
            else:
                return -max_error
        else:
            return error

    def linear_error(self, value, set_point):
        """Compute lienar difference between value and setpoint

        Args:
            value: float
            set_point: float

        Returns:
            diff: float
        """
        return set_point - value

    def angular_error(self, value, set_point):
        """Difference in angle, limited to range [-pi, pi]

        Args:
            value: float
            set_point: float

        Returns:
            diff: float
        """
        return signed_angle_difference(value, set_point)

    def calc_ctrl(
        self,
        value,
        set_point,
        error_function,
        gain,
        dead_band=0.0,
        max_error=float('inf')
    ):
        """Calculate the control value using propotional gain

        Args:
            value: float
            set_point: float
            error_function: function pointer
            gain: float
            dead_band: float
            max_error: float

        Returns:
            ctrl: float
            error: float
        """
        error = self.calc_error(
            value,
            set_point,
            error_function,
            dead_band,
            max_error
        )

        return gain * error, error


def main(args=None):
    rclpy.init(args=args)
    node = PCS()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Here's the service definition:

nav_2d_msgs/Pose2DStamped pose
nav_2d_msgs/Twist2D velocity
nav_2d_msgs/Path2D path
---
nav_2d_msgs/Twist2DStamped cmd_vel

And here's a snippet from my RPC controller:

     /**
     * @brief Controller computeVelocityCommands - calculates the best command given the current pose and velocity
     *
     * It is presumed that the global plan is already set.
     *
     * This is mostly a wrapper for the protected computeVelocityCommands
     * function which has additional debugging info.
     *
     * @param pose Current robot pose (usually in odom frame)
     * @param velocity Current robot velocity
     * @return The best command for the robot to drive
     */
     geometry_msgs::msg::TwistStamped RPCController::computeVelocityCommands(
         const geometry_msgs::msg::PoseStamped & pose,
        const geometry_msgs::msg::Twist & velocity
     )
     {
        geometry_msgs::msg::TwistStamped cmd_vel;
        cmd_vel.header.stamp = node_->now();

        if (!cmdVelClient_->wait_for_service(1s))
        {
            RCLCPP_WARN(
                node_->get_logger(), 
                "%s service not available",
                cmd_vel_srv_name_.c_str()
            );
            return cmd_vel;
        }

        auto request = std::make_shared<cc_interface::srv::CalcCmdVel::Request>();
        auto pose_2d = nav_2d_utils::poseStampedToPose2D(pose);
        auto velocity_2d = nav_2d_utils::twist3Dto2D(velocity);
        auto transformed_plan = transformPlan(
            tf_,
            global_plan_, 
            pose_2d,
            transform_tolerance_,
            costmap_,
            prune_plan_,
            prune_distance_
        );

        request->pose = pose_2d;
        request->velocity = velocity_2d;
        request->path = transformed_plan;

        auto result_future = cmdVelClient_->async_send_request(request);

        nav_2d_msgs::msg::Twist2DStamped cmd_vel_2d;

        auto status = result_future.wait_for(1s);

        if (status == std::future_status::ready)
        {
            auto result = result_future.get();
            cmd_vel_2d = result->cmd_vel; 
            cmd_vel.header = cmd_vel_2d.header;
            cmd_vel.twist = nav_2d_utils::twist2Dto3D(
                cmd_vel_2d.velocity
            );
        } else {
            RCLCPP_ERROR(
                node_->get_logger(), 
                "Failed get cmd_vel"
            );   
        }

        return cmd_vel;
    }
}  // namespace cc_nav2_ext

PLUGINLIB_EXPORT_CLASS(cc_nav2_ext::RPCController, nav2_core::Controller)