ros2 subscriber to allow a single publisher
Hi,
I understand that ros was not meant to have a single publisher to a topic or maintain a relation ship with a particular publisher. Kindly understand that our setup requires at-most single publisher to a given topic because of its safety critical application and other constraints and the frequency of publishing is 2 ms. Hence we chose publisher/subscriber mode.
Our example node registers its publisher by calling a service with publisher's gid as a payload. Once the client node receives success, publisher should start publishing in the topic (register and then publish). Although other publishers can publish into the same topic but the subscriber handler discards the message based on the registered publisher gid.
It couldn't come with a better way of dealing with this, if you have some suggestions kindly let me know and the node2 side code looks also bit complicated.
RegisterFTSensor.srv
# Note: 'fun' is short for function of the service to be performed
bool FUN_UNREGISTER = 0
bool FUN_REGISTER = 1
# maximum size of the unique id.
uint8 PUBLISHER_GUID_SIZE = 24
# request fields
bool fun
uint8[24] wrench_publisher_gid
---
bool success
Node 1 Server
class FTServer : public rclcpp::Node
{
std::unique_ptr<std::array<uint8_t>[]> registered_gid;
rclcpp::Service<RegisterFTSensor>::SharedPtr externalFTSensorRegistrationSrv;
rclcpp::Subscription<geometry_msgs::msg::WrenchStamped>::SharedPtr externalFTSensorSub;
// service handler
void handleExternalFTSensorRegistrationSrv(const std::shared_ptr<rmw_request_id_t> requestHeader,
const std::shared_ptr<RegisterFTSensor::Request> request,
std::shared_ptr<RegisterFTSensor::Response> response)
{
// pseudo code
if (!registered_gid) {
registered_gid = std::make_unique(std::experimental::make_array(request->wrench_publisher_gid));
.....
}
}
// subscription handler
void handleExternalFTSensorSub(const geometry_msgs::msg::WrenchStamped::SharedPtr wrenchStamped,
const rclcpp::MessageInfo& message_info)
{
// pseudo code
if (info.publisher_gid.data == registered_gid) {
// then process
}
}
}
Node 2 Client
using namespace std::chrono_literals;
RegisterFTSensor::Response::SharedPtr send_request(
rclcpp::Node::SharedPtr node,
rclcpp::Client<RegisterFTSensor>::SharedPtr client,
RegisterFTSensor::Request::SharedPtr request) {
auto result = client->async_send_request(request);
// Wait for the result.
if (rclcpp::spin_until_future_complete(node, result) == rclcpp::FutureReturnCode::SUCCESS) {
return result.get();
} else {
return NULL;
}
}
int main(int argc, char** argv) {
// Force flush of the stdout buffer.
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
// Initialize ROS
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("ft_talker");
auto publisher = node->create_publisher<geometry_msgs::msg::WrenchStamped>("external_wrench", 10);
auto client = node->create_client<RegisterFTSensor>("register_ft_sensor");
// Obtain publisher gid
const rmw_gid_t& publisher_rmw_gid = publisher->get_gid();
// Create a request using publisher gid
auto request = std::make_shared<srv::RegisterFTSensor::Request>();
request->fun = RegisterFTSensor::Request::FUN_REGISTER;
std::copy(std::begin(publisher_rmw_gid.data), std::end(publisher_rmw_gid.data),
(request->wrench_publisher_gid).begin());
// wait for service available
while (!client->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(node->get_logger(), "service not available, waiting again...");
}
// send request and receive result
auto result = send_request(node, client, request);
if (result) {
RCLCPP_INFO_STREAM(node->get_logger(), "FT Sensor is registered: " << result->success);
} else {
RCLCPP_ERROR(node->get_logger(), "Interrupted while waiting for response. Exiting.");
}
// Publisher
bool registered = result->success;
if (registered) {
rclcpp::WallRate loop_rate(2ms);
while (rclcpp::ok()) {
auto message = std::make_unique<geometry_msgs::msg::WrenchStamped>();
// message->header.seq = 123;
message->header.stamp = node->get_clock()->now();
message->header.frame_id = "tool";
// fill other values
try {
publisher->publish(std::move(message));
rclcpp::spin_some(node);
} catch (const rclcpp::exceptions::RCLError& e) {
RCLCPP_ERROR(node->get_logger(), "unexpectedly failed with %s", e.what());
}
loop_rate.sleep();
}
}
rclcpp::shutdown();
return 0;
}
Could you clarify what your question is exactly?
Additionally: I'm not sure exactly what your requirements are, but controlling which nodes are allowed to communicate seems like something SROS2's ACL support could be used for. I haven't tested just that part for something like this, but it could work.
Refer to SROS2: Access Control for an example (you'll probably need to follow the instructions in the preceding sections as well).
The code you show doesn't seem very safe, nor secure. Without encryption, there are various man-in-the-middle possibilities. Denial-of-service also seems possible, due to "other publishers can publish, but msgs will be ignored".
because of the single point of control requirement in the relevant safety standards?
@gvdhoorn, We have multiple ft sensors in our system. Multiple Node 2 - each has a client to a service and a publisher to a topic. plz refer above code . At any given session (can be minute/hour/days), single Node2 publisher messages should be processed by Node 1 discarding the messages from other publishers. In order to achieve this, we are using a publisher gid obtained from message_info of subscriber call back function.
Process :
Node2 calls a service
RegisterFTSensor.srv
with its publishers gid as payload. Once the Node1 receives a service call backhandleExternalFTSensorRegistrationSrv
it saveswrench_publisher_gid
as local variable. After which subscriber callback functionhandleExternalFTSensorSub
use this stored gid to filter messages from specific publisher. After the a while (minutes/hours/days) the client unegister itself by calling the same service with publishers gid. After which any other ftsensor Node 2 can register and repeat the same.@gvdhoorn,
Yes, this is a safety requirement.
Can you explain this a bit more ?
DoS: just flood your subscriber with messages from multiple publishers. Even though the messages are ignored, your callback will still be called, requiring your executor to spend time processing incoming events, your callback to get at least through the
if ( not_a_registered_id )
, etc, etc. That's all processing time which you now can't spend on anything else. I believe if a sufficient amount of traffic is sent to your subscriber, it might get difficult for legitimate messages to be processed in time.MitM: your DDS traffic is not encrypted (or at least: you don't state so). So any attacker could sniff your gid, spoof traffic coming from a publisher with that gid and do whatever they want without your subscriber knowing.
MitM2: attacker sniffs traffic during registration of your pub<->sub pair, intercepts the gid, injects their own and now they control the session.
I'm not a security expert, nor do I claim to be able to do these things immediately, but if you're citing safety regulations and intend for the system you describe to help adhere to those regulations, I believe it's worthwhile to think about this.
Personally I would still take a look at the ACLs supported by SROS2, as they seem to support what you're after, without adding a custom authentication layer at the application level. The main uncertainty I have right now would be whether they can be dynamically updated. But perhaps a question should be whether that should even be allowed.
That would depend on your application, and I don't know what that is (neither do I need to).