Test a node that subscribes and publishes to other nodes
Hi,
I have to test that a truck goes over a set of points that are provided in a JSON file. My ROS2 project contains several packages like State Machine, Planning Problem or Simulation. I currently have a node in planning_problem that receives current location of the truck from the state machine and publishes next goal location after asking the user where he/she wants to go. In order to test the whole system I have thought about creating a node very similar to this one but that already has a set of points saved and call it instead of the original one. So I would subscribe to the state machine topic to know my current location, assert that is the desired one and publish the new one until reaching the goal. The problem is how to combine subscribers, publishers and tests at the same time.
My new node process_node.py
looks like this:
#!/usr/bin/env python
import sys
import unittest
import json
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import rclpy
from custom_msg.msg import CustomMsg
from gui_pkg.gui_window import gui_hpp
class ProcessNode(unittest.TestCase):
def __init__(self, *args):
rclpy.init(args=*args)
self.node = rclpy.create_node('process_node')
# publish goal pose
self._plan_prob_publisher = self.node.create_publisher(Float32MultiArray, 'send_goal_pose')
self.gui_window = gui_hpp() # another class useful for some methods it contains
self.current_loc = []
self.answered = False
with open("../../launch/launch/system_test/test_positions.json") as json_file:
self.manual_positions = json.load(json_file)
self.obtain_array_positions()
self.next_pose = [0, 0, 0, 0]
def obtain_array_positions(self):
self.gui_window.multiple_path = [] # where we concatenate all the positions that required to be done for that slot
for coordinate in self.manual_positions:
self.gui_window.multiple_path.append(
[
float(coordinate["x"]),
float(coordinate["y"]),
float(coordinate["orientation"]),
float(coordinate["h_angle"])
]
)
def listener_callback(self, msg):
self.current_loc = msg.coordinates
self.answered = True
def test_correct_positions(self):
self._cur_loc_subscriber = self.node.create_subscription(CustomMsg, 'request_user_ip', self.listener_callback)
while self.gui_window.is_multiple_path(): # iterate for all the steps
self.node.loop.run_until_complete(self.wait_user_ip_callback()) # wait until receiving current location from SM
self.assertEqual(self.current_loc, self.next_pose) # assert current location is equal to the goal position from previous step
self.next_pose = self.gui_window.get_next_pose() # I get the next goal and remove the previous one from memory
self.answered = False
self.publish(self.next_pose) # publish the new goal
# keep checking if message is received
async def check_user_status(self):
await asyncio.sleep(0.5)
while not self.answered: pass
# wait until user input is given
async def wait_user_ip_callback(self): await self.check_user_status()
def publish(self, user_ip):
# publish goal location
msg = Float32MultiArray()
msg.data = user_ip
self._plan_prob_publisher.publish(msg) # send the goal position
self.get_logger().info('User input responded under the topic: respond_user_ip, Data: {0}'.format(user_ip))
if __name__ == '__main__': # main()
import rostest
rostest.rosrun(PKG, 'process_node', ProcessNode, sys.argv)
And I would like to launch it as when I use the actual node but changing it for the testing one, my test_sm.launch.py
:
# ROS imports
from launch import LaunchDescription
from launch_ros.actions import Node
def ...