ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

junya's profile - activity

2020-12-17 21:26:38 -0500 answered a question Test for when a rospy publisher become available

I uses following magic to get publisher. This waits until the number of subscribers assigned to publisher would be same

2020-12-13 09:46:22 -0500 answered a question Can you write unittests that use the parameter server?

I run dummy parameter server on unittest. See my implementation: https://github.com/groove-x/mqtt_bridge/pull/37

2017-01-20 05:44:45 -0500 answered a question With rospy, messages don't seem to be recieved if published soon after creating the publisher

I could overcome this behavior by comparing number of subscribers and number of connections in publisher.

# -*- coding: utf-8 -*-
import time

import rosgraph
import rospy
from std_msgs.msg import String


def get_publisher(topic_path, msg_type, **kwargs):
    pub = rospy.Publisher(topic_path, msg_type, **kwargs)
    num_subs = len(_get_subscribers(topic_path))

    for i in range(10):
        num_cons = pub.get_num_connections()
        if num_cons == num_subs:
            break
        time.sleep(0.1)
    else:
        raise RuntimeError("mismatch !!")

    return pub


def _get_subscribers(topic_path):
    u""" topic_path を subscribe している subscriber のノード名を取得する """
    # rostopic の実装を参考に実装
    topic_path = rosgraph.names.script_resolve_name('rostopic', topic_path)
    ros_master = rosgraph.Master('/rostopic')
    state = ros_master.getSystemState()
    subs = []
    for sub in state[1]:
        if sub[0] == topic_path:
            subs.extend(sub[1])
    return subs


if __name__ == "__main__":
    rospy.init_node("test")
    pub = get_publisher('/testing', String, queue_size=1)
    pub.publish(String("Hello World"))
2016-11-30 02:07:49 -0500 received badge  Enthusiast
2016-11-30 02:07:46 -0500 received badge  Enthusiast
2016-11-18 05:23:53 -0500 received badge  Supporter (source)