ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | Q&A answers.ros.org |
![]() | 1 | initial version |
To answer my own question... It seems plugins need to be written in C++ as nav2 uses pluginlib where classes are dynamically loaded. You have to inherit from e.g. nav2_core Controller base class, and override all the methods such as activate
etc.
However, there is no reason why the C++ plugin can't be a thin wrapper that calls embedded Python, or even delegates to a separate Python node via a ROS service/action interface.
![]() | 2 | No.2 Revision |
To answer my own question... It seems plugins need to be written in C++ as nav2 uses pluginlib where classes are dynamically loaded. You have to inherit from e.g. nav2_core Controller base class, and override all the methods such as activate
etc.
However, there is no reason why the C++ plugin can't be a thin wrapper that calls embedded Python, or even delegates to a separate Python node via a ROS service/action interface.
As requested, here is some code that used to work in Eloquent, but no longer works with Foxy. When I get chance I'll get it working in Foxy and publish the code properly in a git repo.
This is a Python node that will provide a service to compute the cmdvel:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from <your_package>.srv import CalcCmdVel
from my_ros_utils_py.utils import (
pose2d_to_posestamped,
signed_angle_difference,
distance_squared,
distance,
bearing_from
)
DEFAULT_NODE_NAME = 'PCS'
CMDVEL_SERVICE_NAME = "CalcCmdVel"
TARGET_POSE_TOPIC = "target_pose"
LINEAR_PROPORTIONAL_GAIN_PARAM = "linear_velocity_proportional_gain"
ANGULAR_PROPORTIONAL_GAIN_PARAM = "angular_velocity_proportional_gain"
DEFAULT_LINEAR_PROPORTIONAL_GAIN = 1.0
DEFAULT_ANGULAR_PROPORTIONAL_GAIN = 0.5
LINEAR_MAX_ERROR_PARAM = "linear_max_error"
ANGULAR_MAX_ERROR_PARAM = "angular_max_error"
DEFAULT_LINEAR_MAX_ERROR = 1.0
DEFAULT_ANGULAR_MAX_ERROR = 3.14
LINEAR_DEAD_BAND_PARAM = "linear_dead_band"
ANGULAR_DEAD_BAND_PARAM = "angular_dead_band"
DEFAULT_LINEAR_DEAD_BAND = 0.0
DEFAULT_ANGULAR_DEAD_BAND = 0.0
SPOT_TURN_THRESHOLD_PARAM = "spot_turn_threshold"
DEFAULT_SPOT_TURN_THESHOLD = 1.0
LOOKAHEAD_PARAM = "lookahead"
DEFAULT_LOOKAHEAD = 0.2
class PCS(Node):
"""Proportional Controller Service.
Provide cmd_vel computation service.
"""
def __init__(self) -> None:
super().__init__(DEFAULT_NODE_NAME)
self._logger = self.get_logger()
self.declare_parameters(
namespace='',
parameters=[
(LINEAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_LINEAR_PROPORTIONAL_GAIN),
(ANGULAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_ANGULAR_PROPORTIONAL_GAIN),
(LINEAR_MAX_ERROR_PARAM, DEFAULT_LINEAR_MAX_ERROR),
(ANGULAR_MAX_ERROR_PARAM, DEFAULT_ANGULAR_MAX_ERROR),
(LINEAR_DEAD_BAND_PARAM, DEFAULT_LINEAR_DEAD_BAND),
(ANGULAR_DEAD_BAND_PARAM, DEFAULT_ANGULAR_DEAD_BAND),
(SPOT_TURN_THRESHOLD_PARAM, DEFAULT_SPOT_TURN_THESHOLD),
(LOOKAHEAD_PARAM, DEFAULT_LOOKAHEAD)
]
)
self._linear_gain = self.get_parameter(
LINEAR_PROPORTIONAL_GAIN_PARAM
).get_parameter_value().double_value
self._angular_gain = self.get_parameter(
ANGULAR_PROPORTIONAL_GAIN_PARAM
).get_parameter_value().double_value
self._linear_max_error = self.get_parameter(
LINEAR_MAX_ERROR_PARAM
).get_parameter_value().double_value
self._angular_max_error = self.get_parameter(
ANGULAR_MAX_ERROR_PARAM
).get_parameter_value().double_value
self._linear_dead_band = self.get_parameter(
LINEAR_DEAD_BAND_PARAM
).get_parameter_value().double_value
self._angular_dead_band = self.get_parameter(
ANGULAR_DEAD_BAND_PARAM
).get_parameter_value().double_value
self._spot_turn_threshold = self.get_parameter(
SPOT_TURN_THRESHOLD_PARAM
).get_parameter_value().double_value
self._lookahead = self.get_parameter(
LOOKAHEAD_PARAM
).get_parameter_value().double_value
self._cmd_vel_srv = self.create_service(
CalcCmdVel,
CMDVEL_SERVICE_NAME,
self.calc_cmd_vel
)
self._target_pub = self.create_publisher(
PoseStamped,
TARGET_POSE_TOPIC,
1
)
self._logger.info(
"Proportional controller service initialised."
)
def calc_cmd_vel(self, request, response):
"""Compute the velocity command.
Args:
request: see cc_interface/srv/CalcCmdVel
Returns:
response: see cc_interface/srv/CalcCmdVel
"""
path = request.path
pose = request.pose
response.cmd_vel.header.stamp = self.get_clock().now().to_msg()
# Ensure path and pose are in same reference frame
if path.header.frame_id != pose.header.frame_id:
self._logger.error(
"Path frame (\"%s\") not equal to pose (\"%s\")",
path.header.frame_id,
pose.header.frame_id
)
response.cmd_vel.velocity.x = 0.0
response.cmd_vel.velocity.y = 0.0
response.cmd_vel.velocity.theta = 0.0
return response
target_pose = self.get_target_pose(path)
# Angular speed control
target_theta = bearing_from(pose.pose, target_pose)
ctrl_w, err_w = self.calc_ctrl(
pose.pose.theta,
target_theta,
self.angular_error,
self._angular_gain,
self._angular_dead_band,
self._angular_max_error
)
# Linear speed control
dist = 0.0
target_dist = 0.0
if abs(err_w) < self._spot_turn_threshold:
dist = distance(pose.pose, target_pose)
ctrl_v, err_v = self.calc_ctrl(
dist,
target_dist,
self.linear_error,
self._linear_gain,
self._linear_dead_band,
self._linear_max_error
)
# Publish target
target_pose.theta = target_theta
self._target_pub.publish(
pose2d_to_posestamped(
target_pose,
pose.header
)
)
# Controller is feeding back on *distance* so will output a negative
# control value to reduce +ve dist to target_dist (0).
# We are controlling velocity, so will want to increase speed
# to decrease distance hence negative sign on ctrl_v
response.cmd_vel.velocity.x = -ctrl_v # Forward is along x
response.cmd_vel.velocity.y = 0.0
response.cmd_vel.velocity.theta = ctrl_w
return response
def get_target_pose(self, path):
"""Get target pose to head for on path.
Args:
path: nav_2d_msgs/Path2D
Returns:
target: geometry_msgs/Pose2D
"""
start_pose = path.poses[0]
target = next(
(pose for pose in path.poses if distance_squared(pose, start_pose) > self._lookahead),
start_pose
)
return target
def calc_error(
self,
value,
set_point,
error_function,
dead_band=0.0,
max_error=float('inf')
):
"""Calculate error using error function pointer,
applying clipping and dead_band filtering to result
Args:
value: float
set_point: float
error_function: function pointer
dead_band: float
max_error: float
Returns:
error: float
"""
error = error_function(value, set_point)
abs_err = abs(error)
max_error = abs(max_error)
if abs_err <= dead_band:
return 0.0
elif abs_err >= max_error:
if error > 0:
return max_error
else:
return -max_error
else:
return error
def linear_error(self, value, set_point):
"""Compute lienar difference between value and setpoint
Args:
value: float
set_point: float
Returns:
diff: float
"""
return set_point - value
def angular_error(self, value, set_point):
"""Difference in angle, limited to range [-pi, pi]
Args:
value: float
set_point: float
Returns:
diff: float
"""
return signed_angle_difference(value, set_point)
def calc_ctrl(
self,
value,
set_point,
error_function,
gain,
dead_band=0.0,
max_error=float('inf')
):
"""Calculate the control value using propotional gain
Args:
value: float
set_point: float
error_function: function pointer
gain: float
dead_band: float
max_error: float
Returns:
ctrl: float
error: float
"""
error = self.calc_error(
value,
set_point,
error_function,
dead_band,
max_error
)
return gain * error, error
def main(args=None):
rclpy.init(args=args)
node = PCS()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Here's the service definition:
nav_2d_msgs/Pose2DStamped pose
nav_2d_msgs/Twist2D velocity
nav_2d_msgs/Path2D path
---
nav_2d_msgs/Twist2DStamped cmd_vel
And here's a snippet from my RPC controller:
/**
* @brief Controller computeVelocityCommands - calculates the best command given the current pose and velocity
*
* It is presumed that the global plan is already set.
*
* This is mostly a wrapper for the protected computeVelocityCommands
* function which has additional debugging info.
*
* @param pose Current robot pose (usually in odom frame)
* @param velocity Current robot velocity
* @return The best command for the robot to drive
*/
geometry_msgs::msg::TwistStamped RPCController::computeVelocityCommands(
const geometry_msgs::msg::PoseStamped & pose,
const geometry_msgs::msg::Twist & velocity
)
{
geometry_msgs::msg::TwistStamped cmd_vel;
cmd_vel.header.stamp = node_->now();
if (!cmdVelClient_->wait_for_service(1s))
{
RCLCPP_WARN(
node_->get_logger(),
"%s service not available",
cmd_vel_srv_name_.c_str()
);
return cmd_vel;
}
auto request = std::make_shared<cc_interface::srv::CalcCmdVel::Request>();
auto pose_2d = nav_2d_utils::poseStampedToPose2D(pose);
auto velocity_2d = nav_2d_utils::twist3Dto2D(velocity);
auto transformed_plan = transformPlan(
tf_,
global_plan_,
pose_2d,
transform_tolerance_,
costmap_,
prune_plan_,
prune_distance_
);
request->pose = pose_2d;
request->velocity = velocity_2d;
request->path = transformed_plan;
auto result_future = cmdVelClient_->async_send_request(request);
nav_2d_msgs::msg::Twist2DStamped cmd_vel_2d;
auto status = result_future.wait_for(1s);
if (status == std::future_status::ready)
{
auto result = result_future.get();
cmd_vel_2d = result->cmd_vel;
cmd_vel.header = cmd_vel_2d.header;
cmd_vel.twist = nav_2d_utils::twist2Dto3D(
cmd_vel_2d.velocity
);
} else {
RCLCPP_ERROR(
node_->get_logger(),
"Failed get cmd_vel"
);
}
return cmd_vel;
}
} // namespace cc_nav2_ext
PLUGINLIB_EXPORT_CLASS(cc_nav2_ext::RPCController, nav2_core::Controller)