Test a node that subscribes and publishes to other nodes

asked 2019-07-26 08:37:21 -0500

hect1995 gravatar image

updated 2019-07-29 06:03:26 -0500

Hi, I have to test that a truck goes over a set of points that are provided in a JSON file. My ROS2 project contains several packages like State Machine, Planning Problem or Simulation. I currently have a node in planning_problem that receives current location of the truck from the state machine and publishes next goal location after asking the user where he/she wants to go. In order to test the whole system I have thought about creating a node very similar to this one but that already has a set of points saved and call it instead of the original one. So I would subscribe to the state machine topic to know my current location, assert that is the desired one and publish the new one until reaching the goal. The problem is how to combine subscribers, publishers and tests at the same time. My new node process_node.py looks like this:

#!/usr/bin/env python
import sys
import unittest
import json

from rclpy.node import Node
from std_msgs.msg import Float32MultiArray
import rclpy

from custom_msg.msg import CustomMsg
from gui_pkg.gui_window import gui_hpp

class ProcessNode(unittest.TestCase):
    def __init__(self, *args):
        self.node = rclpy.create_node('process_node')    
        # publish goal pose
        self._plan_prob_publisher = self.node.create_publisher(Float32MultiArray, 'send_goal_pose')
        self.gui_window = gui_hpp()  # another class useful for some methods it contains 
        self.current_loc = []
        self.answered = False
        with open("../../launch/launch/system_test/test_positions.json") as json_file:
            self.manual_positions = json.load(json_file)
        self.next_pose = [0, 0, 0, 0]

    def obtain_array_positions(self):
        self.gui_window.multiple_path = []  # where we concatenate all the positions that required to be done for that slot
        for coordinate in self.manual_positions:

    def listener_callback(self, msg):
        self.current_loc = msg.coordinates
        self.answered = True

    def test_correct_positions(self):
        self._cur_loc_subscriber = self.node.create_subscription(CustomMsg, 'request_user_ip', self.listener_callback)
        while self.gui_window.is_multiple_path(): # iterate for all the steps
            self.node.loop.run_until_complete(self.wait_user_ip_callback()) # wait until receiving current location from SM
            self.assertEqual(self.current_loc, self.next_pose) # assert current location is equal to the goal position from previous step
            self.next_pose = self.gui_window.get_next_pose() # I get the next goal and remove the previous one from memory
            self.answered = False
            self.publish(self.next_pose) # publish the new goal

    # keep checking if message is received
    async def check_user_status(self):
        await asyncio.sleep(0.5)
        while not self.answered: pass

    # wait until user input is given
    async def wait_user_ip_callback(self): await self.check_user_status()

    def publish(self, user_ip):
        # publish goal location
        msg = Float32MultiArray()
        msg.data = user_ip
        self._plan_prob_publisher.publish(msg) # send the goal position
        self.get_logger().info('User input responded under the topic: respond_user_ip, Data: {0}'.format(user_ip))

if __name__ == '__main__':  # main()
    import rostest

    rostest.rosrun(PKG, 'process_node', ProcessNode, sys.argv)

And I would like to launch it as when I use the actual node but changing it for the testing one, my test_sm.launch.py:

# ROS imports
from launch import LaunchDescription
from launch_ros.actions import Node

def ...
edit retag flag offensive close merge delete