ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

I don't believe this exists out-of-the-box.

Rosbag command line tools allow you to play back topics at a modified (possibly very slow) rate. If you want very strict control over the publishing, this code snippet should serve:

import roslib, rospy, rosbag
from std_srvs.srv import Empty

class BagByService(object):
    def __init__(self, bagfile):
        self.bag = rosbag.Bag(bagfile, 'r')
        self.message_generator = self.bag.read_messages()
        self.next_msgs_srv = rospy.Service('/next_msg', Empty, self.pub_next_msg)

        #Create a publisher for each topic
        self.publishers = {}
        for con in self.bag._get_connections():
            msg_class = roslib.message.get_message_class(con.datatype)
            self.publishers[con.topic] = rospy.Publisher(con.topic, msg_class)

    def pub_next_msg(self, req):
        topic, msg, time = self.message_generator.next()
        self.publishers[topic].publish(msg)

if __name__=='__main__':
    import sys
    rospy.init_node('bag_by_service')
    bbs = BagByService(sys.argv[1])
    rospy.spin()

Run this with the .bag file as the only commandline argument. Calling the Empty '/next_msg' service will publish the next message in the bag file. Clearly, more interesting things could be done from here (filtering by time, topic, etc.)

I don't believe this exists out-of-the-box.

Rosbag command line tools do allow you to play back topics at a modified (possibly very slow) rate. constant rate with --rate. If you want very strict control over the publishing, this code snippet should serve:

import roslib, rospy, rosbag
from std_srvs.srv import Empty

class BagByService(object):
    def __init__(self, bagfile):
        self.bag = rosbag.Bag(bagfile, 'r')
        self.message_generator = self.bag.read_messages()
        self.next_msgs_srv = rospy.Service('/next_msg', Empty, self.pub_next_msg)

        #Create a publisher for each topic
        self.publishers = {}
        for con in self.bag._get_connections():
            msg_class = roslib.message.get_message_class(con.datatype)
            self.publishers[con.topic] = rospy.Publisher(con.topic, msg_class)

    def pub_next_msg(self, req):
        topic, msg, time = self.message_generator.next()
        self.publishers[topic].publish(msg)

if __name__=='__main__':
    import sys
    rospy.init_node('bag_by_service')
    bbs = BagByService(sys.argv[1])
    rospy.spin()

Run this with the .bag file as the only commandline argument. Calling the Empty '/next_msg' service will publish the next message in the bag file. Clearly, more interesting things could be done from here (filtering by time, topic, etc.)