Mubashir
Mubashir

Reputation: 135

How to export image and video data from a bag file in ros2

I have bag file in ros2 rosbag2_2023_06_21-22_18_10_0.db3 which record the topic /image_raw/compressed I can visualize the image by ros2 run rqt_image_view rqt_image_view. now i want to convert back the bag file to .mp4 how can i do that?

Upvotes: 1

Views: 4850

Answers (2)

Baza
Baza

Reputation: 31

Adding to the previous answer, here is a NOT ros-dependent code that can deserialize a ros2 bag to images (frames) from there you can do whatever you want with them.

from rosbags.highlevel import AnyReader
from pathlib import Path
import cv2
from rosbags.image import message_to_cvimage
count = 0
with AnyReader([Path('path_to_bag_dir_which_contains_metadata.yamal_file')]) as reader:
    # topic and msgtype information is available on .connections list
    for connection in reader.connections:
        print(connection.topic, connection.msgtype)
    for connection, timestamp, rawdata in reader.messages():
        if connection.topic == '/basler/image': # topic Name of images
            msg = reader.deserialize(rawdata, connection.msgtype)
            img = message_to_cvimage(msg, 'bgr8') # change encoding type if needed
            cv2.imwrite("output/folder/frame%06i.png" % count, img)
            count += 1

You need to install rosbags python lib

pip install rosbags
pip install rosbags-image

Upvotes: 3

Mubashir
Mubashir

Reputation: 135

Here is the python script that subscribe to /image_raw/compressed and save the video as output_video.mp4.

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv_bridge
import cv2
import numpy as np


class ImageToVideoConverter(Node):
    def __init__(self):
        super().__init__('image_to_video_converter')
        self.bridge = cv_bridge.CvBridge()
        self.subscription = self.create_subscription(
            CompressedImage,
            '/image_raw/compressed',
            self.image_callback,
            10
        )
        self.video_writer = None

    def image_callback(self, msg):
        try:
            np_arr = np.frombuffer(msg.data, np.uint8)
            cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
            if self.video_writer is None:
                self.init_video_writer(cv_image)
            self.video_writer.write(cv_image)
        except Exception as e:
            self.get_logger().error('Error processing image: %s' % str(e))

    def init_video_writer(self, image):
        try:
            height, width, _ = image.shape
            video_format = 'mp4'  # or any other video format supported by OpenCV
            video_filename = 'output_video.' + video_format
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            fps = 30  # Frames per second
            self.video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
        except Exception as e:
            self.get_logger().error('Error initializing video writer: %s' % str(e))

    def destroy_node(self):
        if self.video_writer is not None:
            self.video_writer.release()
        super().destroy_node()


def main(args=None):
    rclpy.init(args=args)
    image_to_video_converter = ImageToVideoConverter()
    rclpy.spin(image_to_video_converter)
    image_to_video_converter.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Upvotes: 0

Related Questions