How to write a deterministic unit test with rclcpp?
Hello,
Assume I have a "filter node" that subscribe to a data, transform it, and re-publish it.
class Filter : public Node
{
public :
Filter() : Node("filter", "", true)
{
filteredDataPub = create_publisher<Float32>("filtered");
dataIn = create_subscription<Float32>("dataIn", [this](Float32::SharedPtr in) {
auto res = std::make_shared<Float32>();
res->data = in->data * 2;
filteredDataPub->publish(res);
});
}
protected:
Publisher<Float32>::SharedPtr filteredDataPub;
Subscription<Float32>::SharedPtr dataIn;
};
I would like to test it.
So I Wrote the following test :
class TesterNode : public Node
{
public:
TesterNode() : Node("tester", "", true)
{
filteredSubscriber =
create_subscription<Float32>("filtered", [this](Float32::SharedPtr in) {
filteredPromise.set_value(in->data);
});
dataInPublisher = create_publisher<Float32>("dataIn");
}
void publishData(float data)
{
auto res = std::make_shared<Float32>();
res->data = data;
dataInPublisher->publish(res);
}
std::promise<float> filteredPromise;
protected:
rclcpp::Publisher<Float32>::SharedPtr dataInPublisher;
rclcpp::Subscription<Float32>::SharedPtr filteredSubscriber;
};
class TestFixture : public ::testing::Test
{
protected:
void SetUp()
{
testerNode = std::make_shared<TesterNode>();
ASSERT_EQ(testerNode->count_subscribers("dataIn"), static_cast<size_t>(0));
filterNode = std::make_shared<Filter>();
ASSERT_EQ(filterNode->count_publishers("dataIn"), static_cast<size_t>(0));
executor.add_node(filterNode);
executor.add_node(testerNode);
ASSERT_FALSE(rclpp_utils::wait_for_subscriber_and_publisher(testerNode, filterNode, "dataIn", 1000ms));
}
void TearDown()
{
executor.cancel();
executor.remove_node(filterNode);
executor.remove_node(testerNode);
testerNode.reset();
filterNode.reset();
}
std::shared_ptr<TesterNode> testerNode;
rclcpp::executors::SingleThreadedExecutor executor;
private:
std::shared_ptr<Filter> filterNode;
};
TEST_F(TestFixture, weCanFilterOurDatas)
{
auto future = std::shared_future<float>(testerNode->filteredPromise.get_future());
testerNode->publishData(5.0);
ASSERT_EQ(this->executor.spin_until_future_complete(future, std::chrono::seconds(1)),
FutureReturnCode::SUCCESS);
ASSERT_EQ(future.get(), 10.);
}
int main(int argc, char** argv)
{
rclcpp::init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
auto res = RUN_ALL_TESTS();
rclcpp::shutdown();
return res;
}
As you can see "TesterNode" is here to "trigger" the Filter Node, and TestFixture is the boilerplate required to instanciate both nodes.
"wait_for_subscriber_and_publisher" function is a patched version of the one we can found in system_tests. In my version the method return true if there is a timeout, and argument "to be available" is forced to TRUE. And in my version I wait for both publisher and subscriber.
bool wait_for_subscriber_and_publisher(std::shared_ptr<rclcpp::Node> publisherNode,
std::shared_ptr<rclcpp::Node> subscriberNode,
const std::string& topic_name, std::chrono::milliseconds timeout)
{
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::steady_clock;
auto start = steady_clock::now();
microseconds time_slept(0);
auto predicate = [&publisherNode, &subscriberNode, &topic_name]() -> bool {
// the subscriber is available if the count is gt 0
return publisherNode->count_subscribers(topic_name) > 0 &&
subscriberNode->count_publishers(topic_name) > 0;
};
while(!predicate() && time_slept < duration_cast<microseconds>(timeout))
{
rclcpp::Event::SharedPtr graph_event = publisherNode->get_graph_event();
publisherNode->wait_for_graph_change(graph_event, 5ms);
time_slept =
duration_cast<std::chrono::microseconds>(steady_clock::now() - start);
}
int64_t time_slept_count =
std::chrono::duration_cast<std::chrono::milliseconds>(time_slept).count();
RCLCPP_INFO(publisherNode->get_logger(),
"Waited %d ms for the subscriber to connect to topic '%s'. Sub "
"count : %d Pub count : %d\n",
time_slept_count, topic_name.c_str(),
publisherNode->count_subscribers(topic_name),
subscriberNode->count_publishers(topic_name));
return !predicate();
}
When I run my test several times, some times test pass, some times test doesn't pass. When test fails, this is because spin_until_future_complete return a TIMEOUT. As you can see I force my nodes to use intra process comunication (passing true as third argument of node constructor). But even if I use the ...