ROS Resources: Documentation | Support | Discussion Forum | Index | Service Status | ros @ Robotics Stack Exchange
Ask Your Question

Revision history [back]

Okay, I spent hours trying to figure this out, and I solved it! I'll put my code below, but in general my approach is to use the buffer.lookup_transform to get the transform, AND the broadcaster.sendTransform to publish a transform.

What this does is let me read a rosbag, re-publish any /tf or /tf_static messages, and then when it's time to convert frames I can use rospy.Time.now() and get the "current" transform, where "current" is whatever the most recently published transform is.

You might have an issue if your first data appears before your first /tf or /tf_static message appears; if you find this to be the case then you can optionally drop that data. For my personal use case, it doesn't matter. I tried removing the try/catch in the hopes that it'd speed up processing, but it didn't :(

Here's the code, enjoy!

import argparse
import rosbag
import rospy
import os.path
import tf2_ros
from tf2_sensor_msgs.tf2_sensor_msgs import do_transform_cloud

parser = argparse.ArgumentParser(description='Converts all pointclouds to a target frame.')
parser.add_argument('input', type=str, help='Input Rosbag')
parser.add_argument('output', type=str, help='Output Rosbag')

args = parser.parse_args()

# Force input/output files to be .bag files
if ".bag" not in args.input:
    print("Input file needs to have a *.bag ending")
    exit()
if ".bag" not in args.output:
    print("Output file needs to have a *.bag ending")
    exit()

# Does the input file exist?
if not os.path.isfile(args.input):
    print("Input file cannot be found.")
    exit()

# Need to initialize a ROS node to do the conversion
rospy.init_node("converter")

# Setup the tf2 listener:
buffer = tf2_ros.Buffer()
listener = tf2_ros.TransformListener(buffer)

# Setup the tf2 broadcaster
broadcaster = tf2_ros.TransformBroadcaster()

# What are you trying to transform to?
target_frame = "vehicle"  # NOTE: You could use this as an input argument instead, for more flexibility!

# Transform the data!
with rosbag.Bag(args.output, 'w') as output:
    for topic, msg, t in rosbag.Bag(args.input).read_messages():

        # Inputs are copied as the basis for outputs
        new_msg = msg

        # Re-publish any /tf or /tf_static messages, so you can look those transforms up later
        if topic == "/tf":
            for msg_tf in msg.transforms:
                broadcaster.sendTransform(msg_tf)
        if topic == "/tf_static":
            for msg_tf in msg.transforms:
                broadcaster.sendTransform(msg_tf)

        # For my use case, I'm trying to transform PointCloud2 messages. You could switch on your own topic here.
        if hasattr(new_msg, 'cloud'):
            try:
                transform = buffer.lookup_transform(target_frame,
                                                    msg.cloud.header.frame_id,
                                                    rospy.Time.now(),
                                                    rospy.Duration(0))
                new_msg.cloud = do_transform_cloud(msg.cloud, transform)
            except:
                print("Failed to find a transform to " + target_frame)
                # Choosing to continue here because you've got a cloud that wasn't transformed!
                continue
        # Output bag file has all the same topics in the input, but you're replacing the transformed messages!
        output.write(topic, new_msg, t)

# Don't forget to close the bag when you're done writing!
output.close()

Okay, I spent hours trying to figure this out, and I solved it! I'll put my code below, but in general my approach is to use the buffer.lookup_transform to get the transform, AND the broadcaster.sendTransform to publish a transform.

What this does is let me read a rosbag, re-publish any /tf or /tf_static messages, and then when it's time to convert frames I can use rospy.Time.now() and get the "current" transform, where "current" is whatever the most recently published transform is.

You might have an issue if your first data appears before your first /tf or /tf_static message appears; if you find this to be the case then you can optionally drop that data. For my personal use case, it doesn't matter. I tried removing the try/catch in the hopes that it'd speed up processing, but it didn't :(

Here's the code, enjoy!

import argparse
import rosbag
import rospy
import os.path
import tf2_ros
from tf2_sensor_msgs.tf2_sensor_msgs import do_transform_cloud

parser = argparse.ArgumentParser(description='Converts all pointclouds to a target frame.')
parser.add_argument('input', type=str, help='Input Rosbag')
parser.add_argument('output', type=str, help='Output Rosbag')

args = parser.parse_args()

# Force input/output files to be .bag files
if ".bag" not in args.input:
    print("Input file needs to have a *.bag ending")
    exit()
if ".bag" not in args.output:
    print("Output file needs to have a *.bag ending")
    exit()

# Does the input file exist?
if not os.path.isfile(args.input):
    print("Input file cannot be found.")
    exit()

# Need to initialize a ROS node to do the conversion
rospy.init_node("converter")

# Setup the tf2 listener:
buffer = tf2_ros.Buffer()
listener = tf2_ros.TransformListener(buffer)

# Setup the tf2 broadcaster
broadcaster = tf2_ros.TransformBroadcaster()

# What are you trying to transform to?
target_frame = "vehicle" "your_target_frame"  # NOTE: You could use this as an input argument instead, for more flexibility!

# Transform the data!
with rosbag.Bag(args.output, 'w') as output:
    for topic, msg, t in rosbag.Bag(args.input).read_messages():

        # Inputs are copied as the basis for outputs
        new_msg = msg

        # Re-publish any /tf or /tf_static messages, so you can look those transforms up later
        if topic == "/tf":
            for msg_tf in msg.transforms:
                broadcaster.sendTransform(msg_tf)
        if topic == "/tf_static":
            for msg_tf in msg.transforms:
                broadcaster.sendTransform(msg_tf)

        # For my use case, I'm trying to transform PointCloud2 messages. You could switch on your own topic here.
        if hasattr(new_msg, 'cloud'):
            try:
                transform = buffer.lookup_transform(target_frame,
                                                    msg.cloud.header.frame_id,
                                                    rospy.Time.now(),
                                                    rospy.Duration(0))
                new_msg.cloud = do_transform_cloud(msg.cloud, transform)
            except:
                print("Failed to find a transform to " + target_frame)
                # Choosing to continue here because you've got a cloud that wasn't transformed!
                continue
        # Output bag file has all the same topics in the input, but you're replacing the transformed messages!
        output.write(topic, new_msg, t)

# Don't forget to close the bag when you're done writing!
output.close()