To answer my own question... It seems plugins need to be written in C++ as nav2 uses pluginlib where classes are dynamically loaded. You have to inherit from e.g. nav2_core Controller base class, and override all the methods such as activate
etc.
However, there is no reason why the C++ plugin can't be a thin wrapper that calls embedded Python, or even delegates to a separate Python node via a ROS service/action interface.
As requested, here is some code that used to work in Eloquent, but no longer works with Foxy. When I get chance I'll get it working in Foxy and publish the code properly in a git repo.
This is a Python node that will provide a service to compute the cmdvel:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from <your_package>.srv import CalcCmdVel
from my_ros_utils_py.utils import (
pose2d_to_posestamped,
signed_angle_difference,
distance_squared,
distance,
bearing_from
)
DEFAULT_NODE_NAME = 'PCS'
CMDVEL_SERVICE_NAME = "CalcCmdVel"
TARGET_POSE_TOPIC = "target_pose"
LINEAR_PROPORTIONAL_GAIN_PARAM = "linear_velocity_proportional_gain"
ANGULAR_PROPORTIONAL_GAIN_PARAM = "angular_velocity_proportional_gain"
DEFAULT_LINEAR_PROPORTIONAL_GAIN = 1.0
DEFAULT_ANGULAR_PROPORTIONAL_GAIN = 0.5
LINEAR_MAX_ERROR_PARAM = "linear_max_error"
ANGULAR_MAX_ERROR_PARAM = "angular_max_error"
DEFAULT_LINEAR_MAX_ERROR = 1.0
DEFAULT_ANGULAR_MAX_ERROR = 3.14
LINEAR_DEAD_BAND_PARAM = "linear_dead_band"
ANGULAR_DEAD_BAND_PARAM = "angular_dead_band"
DEFAULT_LINEAR_DEAD_BAND = 0.0
DEFAULT_ANGULAR_DEAD_BAND = 0.0
SPOT_TURN_THRESHOLD_PARAM = "spot_turn_threshold"
DEFAULT_SPOT_TURN_THESHOLD = 1.0
LOOKAHEAD_PARAM = "lookahead"
DEFAULT_LOOKAHEAD = 0.2
class PCS(Node):
"""Proportional Controller Service.
Provide cmd_vel computation service.
"""
def __init__(self) -> None:
super().__init__(DEFAULT_NODE_NAME)
self._logger = self.get_logger()
self.declare_parameters(
namespace='',
parameters=[
(LINEAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_LINEAR_PROPORTIONAL_GAIN),
(ANGULAR_PROPORTIONAL_GAIN_PARAM, DEFAULT_ANGULAR_PROPORTIONAL_GAIN),
(LINEAR_MAX_ERROR_PARAM, DEFAULT_LINEAR_MAX_ERROR),
(ANGULAR_MAX_ERROR_PARAM, DEFAULT_ANGULAR_MAX_ERROR),
(LINEAR_DEAD_BAND_PARAM, DEFAULT_LINEAR_DEAD_BAND),
(ANGULAR_DEAD_BAND_PARAM, DEFAULT_ANGULAR_DEAD_BAND),
(SPOT_TURN_THRESHOLD_PARAM, DEFAULT_SPOT_TURN_THESHOLD),
(LOOKAHEAD_PARAM, DEFAULT_LOOKAHEAD)
]
)
self._linear_gain = self.get_parameter(
LINEAR_PROPORTIONAL_GAIN_PARAM
).get_parameter_value().double_value
self._angular_gain = self.get_parameter(
ANGULAR_PROPORTIONAL_GAIN_PARAM
).get_parameter_value().double_value
self._linear_max_error = self.get_parameter(
LINEAR_MAX_ERROR_PARAM
).get_parameter_value().double_value
self._angular_max_error = self.get_parameter(
ANGULAR_MAX_ERROR_PARAM
).get_parameter_value().double_value
self._linear_dead_band = self.get_parameter(
LINEAR_DEAD_BAND_PARAM
).get_parameter_value().double_value
self._angular_dead_band = self.get_parameter(
ANGULAR_DEAD_BAND_PARAM
).get_parameter_value().double_value
self._spot_turn_threshold = self.get_parameter(
SPOT_TURN_THRESHOLD_PARAM
).get_parameter_value().double_value
self._lookahead = self.get_parameter(
LOOKAHEAD_PARAM
).get_parameter_value().double_value
self._cmd_vel_srv = self.create_service(
CalcCmdVel,
CMDVEL_SERVICE_NAME,
self.calc_cmd_vel
)
self._target_pub = self.create_publisher(
PoseStamped,
TARGET_POSE_TOPIC,
1
)
self._logger.info(
"Proportional controller service initialised."
)
def calc_cmd_vel(self, request, response):
"""Compute the velocity command.
Args:
request: see cc_interface/srv/CalcCmdVel
Returns:
response: see cc_interface/srv/CalcCmdVel
"""
path = request.path
pose = request.pose
response.cmd_vel.header.stamp = self.get_clock().now().to_msg()
# Ensure path and pose are in same reference frame
if path.header.frame_id != pose.header.frame_id:
self._logger.error(
"Path frame (\"%s\") not equal to pose (\"%s\")",
path.header.frame_id,
pose.header.frame_id
)
response.cmd_vel.velocity.x = 0.0
response.cmd_vel.velocity.y = 0.0
response.cmd_vel.velocity.theta = 0.0
return response
target_pose = self.get_target_pose(path)
# Angular speed control
target_theta = bearing_from(pose.pose, target_pose)
ctrl_w, err_w = self.calc_ctrl(
pose.pose.theta,
target_theta,
self.angular_error,
self._angular_gain,
self._angular_dead_band,
self._angular_max_error
)
# Linear speed control
dist = 0.0
target_dist = 0.0
if abs(err_w) < self._spot_turn_threshold:
dist = distance(pose.pose, target_pose)
ctrl_v, err_v = self.calc_ctrl(
dist,
target_dist,
self.linear_error,
self._linear_gain,
self._linear_dead_band,
self._linear_max_error
)
# Publish target
target_pose.theta = target_theta
self._target_pub.publish(
pose2d_to_posestamped(
target_pose,
pose.header
)
)
# Controller is feeding back on *distance* so will output a negative
# control value to reduce +ve dist to ...
(more)