如何使用yolo异步函数进行视频写入

问题描述 投票:0回答:1

我正在开发带有YOLO和ZED立体相机(60fps)的自动录制系统。

我每秒运行YOLO,当检测到人时,继续录制视频。 如果出现 5 次 False 结果,则停止记录。

但是,当 YOLO 运行时,帧会丢失并且不会记录在视频中。 所以我想在我的代码中应用异步函数,但它不起作用。

这是我的测试伪代码。 视频录制完美,没有掉帧,但是 yolo 没有运行。

import time
from utils.detect import YoloONNX
from common.config import VIDEO_CODEC
from utils.util import *
import cv2
import imutils
import os
import numpy as np
import pyzed.sl as sl
import asyncio

model = YoloONNX("./models/yolov7.onnx")

async def prepare_detect_person(frame):
    try:
        print('prepare_detect_person')
        frame = imutils.resize(frame, width=600)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, detect_person, frame)
    except Exception as e:
        print(f"Error occurred: {str(e)}")
def detect_person(frame):
    res = model.onnx_inference(frame)

async def main():
    zed = sl.Camera()

    init_params = sl.InitParameters()
    init_params.camera_resolution = sl.RESOLUTION.HD720
    init_params.camera_fps = 60

    err = zed.open(init_params)
    if err != sl.ERROR_CODE.SUCCESS:
        print("Camera Open : "+repr(err)+". Exit program.")
        exit()

    fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
    video_writer = cv2.VideoWriter('./async_test.mp4', fourcc, 60,(1280, 720))
    i = 0
    max_frame = 10000
    image = sl.Mat()
    runtime_parameters = sl.RuntimeParameters()
    while i < max_frame:
        if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS:
            zed.retrieve_image(image, sl.VIEW.LEFT)
            frame = image.get_data()
            frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
            if i % 30 == 0:
                print(f'start detect person')
                # await prepare_detect_person(frame)
                asyncio.create_task(prepare_detect_person(frame))
            i += 1
            video_writer.write(frame)

    video_writer.release()
    zed.close()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

运行YOLO模型时可以进行视频写入而不掉帧吗?

python opencv asynchronous
1个回答
0
投票

使用此代码更正它

import cv2
import imutils
import asyncio
import numpy as np
import pyzed.sl as sl
from utils.detect import YoloONNX
from common.config import VIDEO_CODEC

model = YoloONNX("./models/yolov7.onnx")

async def detect_person(frame):
    try:
        print('Detecting person...')
        res = model.onnx_inference(frame)
        return res  # Assuming the model.onnx_inference returns something indicating detection success
    except Exception as e:
        print(f"Error occurred: {str(e)}")
        return False

async def main():
    zed = sl.Camera()

    init_params = sl.InitParameters()
    init_params.camera_resolution = sl.RESOLUTION.HD720
    init_params.camera_fps = 60

    err = zed.open(init_params)
    if err != sl.ERROR_CODE.SUCCESS:
        print("Camera Open : "+repr(err)+". Exit program.")
        exit()

    fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
    video_writer = cv2.VideoWriter('./async_test.mp4', fourcc, 60,(1280, 720))
    i = 0
    max_frame = 10000
    image = sl.Mat()
    runtime_parameters = sl.RuntimeParameters()
    false_results_count = 0

    while i < max_frame:
        if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS:
            zed.retrieve_image(image, sl.VIEW.LEFT)
            frame = image.get_data()
            frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)

            if i % 30 == 0:
                detection_result = await detect_person(frame)
                if detection_result:
                    false_results_count = 0
                else:
                    false_results_count += 1

                if false_results_count >= 5:
                    print("Stopping recording...")
                    break

            i += 1
            video_writer.write(frame)

    video_writer.release()
    zed.close()

if __name__ == "__main__":
    asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.