Robotics StackExchange | Archived questions

How to export image and video data from a bag file in ros2

I have bag file in ros2 rosbag2_2023_06_21-22_18_10_0.db3 which record the topic /image_raw/compressed I can visualize the image by ros2 run rqt_image_view rqt_image_view. now i want to convert back the bag file to .mp4 how can i do that?

Asked by Mubashir alam on 2023-06-22 02:08:58 UTC

Comments

Answers

I had a similar issue a week or two ago. Unfortunately I don't believe there's a neat solution as a bag is a container for one or multiple topics.

The solution I landed on was to make a simple python node to record video from (a) topic(s) of interest, then using cv_bridge and OpenCV's VideoWriter write that into the output file while playing back the bag.

Make sure you remember to have a destroy_node function that will release the VideoWriter before you call rclpy.shutdown() to avoid having the video not written correctly.

Asked by Nilaos on 2023-06-22 23:14:32 UTC

Comments

can you kindly shear the script

Asked by Mubashir alam on 2023-06-22 23:21:16 UTC

I don't have it on me unfortunately. https://stackoverflow.com/questions/72661821/closing-a-videowriter-in-a-ros2-node is similar and may be a good reference point?

Asked by Nilaos on 2023-06-22 23:35:31 UTC

I write a python script that subscribe to /image_raw/compressed and save the video as output_video.mp4. here is the script

``` import rclpy from rclpy.node import Node from sensor_msgs.msg import CompressedImage import cv_bridge import cv2 import numpy as np

class ImageToVideoConverter(Node): def init(self): super().init('image_to_video_converter') self.bridge = cv_bridge.CvBridge() self.subscription = self.create_subscription( CompressedImage, '/image_raw/compressed', self.image_callback, 10 ) self.video_writer = None

def image_callback(self, msg):
    try:
        np_arr = np.frombuffer(msg.data, np.uint8)
        cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
        if self.video_writer is None:
            self.init_video_writer(cv_image)
        self.video_writer.write(cv_image)
    except Exception as e:
        self.get_logger().error('Error processing image: %s' % str(e))

def init_video_writer(self, image):
    try:
        height, width, _ = image.shape
        video_format = 'mp4'  # or any other video format supported by OpenCV
        video_filename = 'output_video.' + video_format
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        fps = 30  # Frames per second
        self.video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
    except Exception as e:
        self.get_logger().error('Error initializing video writer: %s' % str(e))

def destroy_node(self):
    if self.video_writer is not None:
        self.video_writer.release()
    super().destroy_node()

def main(args=None): rclpy.init(args=args) image_to_video_converter = ImageToVideoConverter() rclpy.spin(image_to_video_converter) image_to_video_converter.destroy_node() rclpy.shutdown()

if name == 'main': main()

```

Asked by Mubashir alam on 2023-06-23 00:54:45 UTC

Comments