我在ros2中有一个包文件
rosbag2_2023_06_21-22_18_10_0.db3
,它记录了主题/image_raw/compressed
我可以通过ros2 run rqt_image_view rqt_image_view
可视化图像。现在我想将 bag 文件转换回 .mp4 我该怎么做?
这里是订阅
/image_raw/compressed
并将视频保存为 output_video.mp4
的 python 脚本。
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv_bridge
import cv2
import numpy as np
class ImageToVideoConverter(Node):
def __init__(self):
super().__init__('image_to_video_converter')
self.bridge = cv_bridge.CvBridge()
self.subscription = self.create_subscription(
CompressedImage,
'/image_raw/compressed',
self.image_callback,
10
)
self.video_writer = None
def image_callback(self, msg):
try:
np_arr = np.frombuffer(msg.data, np.uint8)
cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if self.video_writer is None:
self.init_video_writer(cv_image)
self.video_writer.write(cv_image)
except Exception as e:
self.get_logger().error('Error processing image: %s' % str(e))
def init_video_writer(self, image):
try:
height, width, _ = image.shape
video_format = 'mp4' # or any other video format supported by OpenCV
video_filename = 'output_video.' + video_format
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = 30 # Frames per second
self.video_writer = cv2.VideoWriter(video_filename, fourcc, fps, (width, height))
except Exception as e:
self.get_logger().error('Error initializing video writer: %s' % str(e))
def destroy_node(self):
if self.video_writer is not None:
self.video_writer.release()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
image_to_video_converter = ImageToVideoConverter()
rclpy.spin(image_to_video_converter)
image_to_video_converter.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
添加到之前的答案中,这里有一个NOT ros 相关代码,可以将 ros2 包反序列化为图像(帧),从那里你可以对它们做任何你想做的事情。
from rosbags.highlevel import AnyReader
from pathlib import Path
import cv2
from rosbags.image import message_to_cvimage
count = 0
with AnyReader([Path('path_to_bag_dir_which_contains_metadata.yamal_file')]) as reader:
# topic and msgtype information is available on .connections list
for connection in reader.connections:
print(connection.topic, connection.msgtype)
for connection, timestamp, rawdata in reader.messages():
if connection.topic == '/basler/image': # topic Name of images
msg = reader.deserialize(rawdata, connection.msgtype)
img = message_to_cvimage(msg, 'bgr8') # change encoding type if needed
print(img)
cv2.imwrite("output/folder/frame%06i.png" % count, img)
count += 1
需要安装rosbags python lib
pip install rosbags
pip install rosbags-image