The `rospy.wait_for_message` method does not work in test case

asked 2019-07-24 02:43:05 -0500

kump gravatar image

updated 2019-07-24 03:53:33 -0500

I am trying to write some tests for my python node my_node. This node should start sending Float64 messages with value 0.0 on the wheel command topics after it recieves a MyMsg message with a deflection field in z-axis set to higher than 0.005 (the threshold from param server). So I thought I would run the my_node node, create another node in the test file that would send this message with deflection exceeding the threshold, and then wait for the messages from the both wheel command topics and see if they are zero.

I have confirmed that the my_node is running. The threshold parameter is on the param server and is read by the my_node. Also the wheels command topics are present in rospy.get_published_topics('my_robot') list. The test runs as expected until it gets to the rospy.wait_for_message where it gets stucked and the test ends with an error on the timeout.

It seems to me like this should work (the whole test script is shown below). Can you see any reason why this does not work? Or how to debug further?

the error:

Traceback (most recent call last):
  File "/home/linux/catkin_ws/src/my_package/tests/scripts/test_my_node.py", line 103, in <module>
    rosunit.unitrun('my_package', 'test_my_node', TestMyNode)
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/rosunit/pyunit.py", line 99, in unitrun
    result = create_xml_runner(package, test_name, result_file).run(suite)
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/rosunit/xmlrunner.py", line 261, in run
    test(result)
  File "/usr/lib/python2.7/unittest/suite.py", line 70, in __call__
    return self.run(*args, **kwds)
  File "/usr/lib/python2.7/unittest/suite.py", line 108, in run
    test(result)
  File "/usr/lib/python2.7/unittest/case.py", line 393, in __call__
    return self.run(*args, **kwds)
  File "/usr/lib/python2.7/unittest/case.py", line 329, in run
    testMethod()
  File "/home/linux/catkin_ws/src/my_package/tests/scripts/test_my_node.py", line 89, in test_stop_messages_were_sent
    left_wheel_msg = rospy.wait_for_message('/my_robot/joint_rear_left_wheel_controller/command', Float64)
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/rospy/client.py", line 426, in wait_for_message
    raise rospy.exceptions.ROSInterruptException("rospy shutdown")
rospy.exceptions.ROSInterruptException: rospy shutdown
[Testcase: testTestMyNode] ... ERROR!
ERROR: max time [60.0s] allotted for test [TestMyNode] of type [my_package/test_my_node.py]
  File "/usr/lib/python2.7/unittest/case.py", line 329, in run
    testMethod()
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/rostest/runner.py", line 148, in fn
    self.test_parent.run_test(test)
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/rostest/rostest_parent.py", line 132, in run_test
    return self.runner.run_test(test)
  File "/opt/ros/kinetic/lib/python2.7/dist-packages/roslaunch/launch.py", line 684, in run_test
    (test.time_limit, test.test_name, test.package, test.type))
--------------------------------------------------------------------------------

[ROSTEST]-----------------------------------------------------------------------


SUMMARY
 * RESULT: FAIL
 * TESTS: 0
 * ERRORS: 1
 * FAILURES: 0

the test script:

 #!/usr/bin/env python
import subprocess
import shlex
import sys
import signal
import psutil
import unittest

import roslaunch
import rospy
import rosnode
from my_msgs.msg import *
from std_msgs.msg import Float64

def kill_child_processes(parent_pid, sig=signal.SIGTERM ...
(more)
edit retag flag offensive close merge delete