Test a node that subscribes and publishes to other nodes
Hi, I have to test that a truck goes over a set of points that are provided in a JSON file. My ROS2 project contains several packages like State Machine, Planning Problem or Simulation. I currently have a node in planningproblem that receives current location of the truck from the state machine and publishes next goal location after asking the user where he/she wants to go. In order to test the whole system I have thought about creating a node very similar to this one but that already has a set of points saved and call it instead of the original one. So I would subscribe to the state machine topic to know my current location, assert that is the desired one and publish the new one until reaching the goal. The problem is how to combine subscribers, publishers and tests at the same time. My new node `processnode.py` looks like this:
#!/usr/bin/env python
import sys
import unittest
import json
from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import rclpy
from custom_msg.msg import CustomMsg
from gui_pkg.gui_window import gui_hpp
class ProcessNode(unittest.TestCase):
def __init__(self, *args):
rclpy.init(args=*args)
self.node = rclpy.create_node('process_node')
# publish goal pose
self._plan_prob_publisher = self.node.create_publisher(Float32MultiArray, 'send_goal_pose')
self.gui_window = gui_hpp() # another class useful for some methods it contains
self.current_loc = []
self.answered = False
with open("../../launch/launch/system_test/test_positions.json") as json_file:
self.manual_positions = json.load(json_file)
self.obtain_array_positions()
self.next_pose = [0, 0, 0, 0]
def obtain_array_positions(self):
self.gui_window.multiple_path = [] # where we concatenate all the positions that required to be done for that slot
for coordinate in self.manual_positions:
self.gui_window.multiple_path.append(
[
float(coordinate["x"]),
float(coordinate["y"]),
float(coordinate["orientation"]),
float(coordinate["h_angle"])
]
)
def listener_callback(self, msg):
self.current_loc = msg.coordinates
self.answered = True
def test_correct_positions(self):
self._cur_loc_subscriber = self.node.create_subscription(CustomMsg, 'request_user_ip', self.listener_callback)
while self.gui_window.is_multiple_path(): # iterate for all the steps
self.node.loop.run_until_complete(self.wait_user_ip_callback()) # wait until receiving current location from SM
self.assertEqual(self.current_loc, self.next_pose) # assert current location is equal to the goal position from previous step
self.next_pose = self.gui_window.get_next_pose() # I get the next goal and remove the previous one from memory
self.answered = False
self.publish(self.next_pose) # publish the new goal
# keep checking if message is received
async def check_user_status(self):
await asyncio.sleep(0.5)
while not self.answered: pass
# wait until user input is given
async def wait_user_ip_callback(self): await self.check_user_status()
def publish(self, user_ip):
# publish goal location
msg = Float32MultiArray()
msg.data = user_ip
self._plan_prob_publisher.publish(msg) # send the goal position
self.get_logger().info('User input responded under the topic: respond_user_ip, Data: {0}'.format(user_ip))
if __name__ == '__main__': # main()
import rostest
rostest.rosrun(PKG, 'process_node', ProcessNode, sys.argv)
And I would like to launch it as when I use the actual node but changing it for the testing one, my test_sm.launch.py
:
# ROS imports
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
ld = LaunchDescription([
Node(package='gui_pkg', node_executable='process_node', output='screen'),
Node(package='planning_prob_pkg', node_executable='planning_prob_node', output='screen'),
Node(package='planner_pkg', node_executable='planner_node', output='screen'),
Node(package='simulation_pkg', node_executable='simulation_node', output='screen'),
Node(package='state_machine_pkg', node_executable='state_machine_node', output='screen')
])
return ld
if __name__ == '__main__':
generate_launch_description()
I would appreciate if somebody can:
When I run the presented structure
ros2 launch launch test_sm.launch.py
I get the errorAttributeError: module 'gui_pkg.process_node' has no attribute 'main'
which makes sense so I would like to know how to launch all the nodes as always and the process_node as a test node which includes all the Node functionalities to interact with the rest of nodes and still be able toassert
Another possibility that comes into my mind is leaving all the nodes as they are and just create a node that subscribes to the topic where I publish my current location and asserts that is corrected but again I do not know how to launch a testing node.
I am using Ubuntu 18.04; ROS2 Crystal and Python 3.6.2
Asked by hect1995 on 2019-07-26 08:37:21 UTC
Comments