我正在开发带有YOLO和ZED立体相机(60fps)的自动录制系统。
我每秒运行YOLO,当检测到人时,继续录制视频。 如果出现 5 次 False 结果,则停止记录。
但是,当 YOLO 运行时,帧会丢失并且不会记录在视频中。 所以我想在我的代码中应用异步函数,但它不起作用。
这是我的测试伪代码。 视频录制完美,没有掉帧,但是 yolo 没有运行。
import time
from utils.detect import YoloONNX
from common.config import VIDEO_CODEC
from utils.util import *
import cv2
import imutils
import os
import numpy as np
import pyzed.sl as sl
import asyncio
model = YoloONNX("./models/yolov7.onnx")
async def prepare_detect_person(frame):
try:
print('prepare_detect_person')
frame = imutils.resize(frame, width=600)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, detect_person, frame)
except Exception as e:
print(f"Error occurred: {str(e)}")
def detect_person(frame):
res = model.onnx_inference(frame)
async def main():
zed = sl.Camera()
init_params = sl.InitParameters()
init_params.camera_resolution = sl.RESOLUTION.HD720
init_params.camera_fps = 60
err = zed.open(init_params)
if err != sl.ERROR_CODE.SUCCESS:
print("Camera Open : "+repr(err)+". Exit program.")
exit()
fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
video_writer = cv2.VideoWriter('./async_test.mp4', fourcc, 60,(1280, 720))
i = 0
max_frame = 10000
image = sl.Mat()
runtime_parameters = sl.RuntimeParameters()
while i < max_frame:
if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS:
zed.retrieve_image(image, sl.VIEW.LEFT)
frame = image.get_data()
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
if i % 30 == 0:
print(f'start detect person')
# await prepare_detect_person(frame)
asyncio.create_task(prepare_detect_person(frame))
i += 1
video_writer.write(frame)
video_writer.release()
zed.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
运行YOLO模型时可以进行视频写入而不掉帧吗?
使用此代码更正它
import cv2
import imutils
import asyncio
import numpy as np
import pyzed.sl as sl
from utils.detect import YoloONNX
from common.config import VIDEO_CODEC
model = YoloONNX("./models/yolov7.onnx")
async def detect_person(frame):
try:
print('Detecting person...')
res = model.onnx_inference(frame)
return res # Assuming the model.onnx_inference returns something indicating detection success
except Exception as e:
print(f"Error occurred: {str(e)}")
return False
async def main():
zed = sl.Camera()
init_params = sl.InitParameters()
init_params.camera_resolution = sl.RESOLUTION.HD720
init_params.camera_fps = 60
err = zed.open(init_params)
if err != sl.ERROR_CODE.SUCCESS:
print("Camera Open : "+repr(err)+". Exit program.")
exit()
fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC)
video_writer = cv2.VideoWriter('./async_test.mp4', fourcc, 60,(1280, 720))
i = 0
max_frame = 10000
image = sl.Mat()
runtime_parameters = sl.RuntimeParameters()
false_results_count = 0
while i < max_frame:
if zed.grab(runtime_parameters) == sl.ERROR_CODE.SUCCESS:
zed.retrieve_image(image, sl.VIEW.LEFT)
frame = image.get_data()
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
if i % 30 == 0:
detection_result = await detect_person(frame)
if detection_result:
false_results_count = 0
else:
false_results_count += 1
if false_results_count >= 5:
print("Stopping recording...")
break
i += 1
video_writer.write(frame)
video_writer.release()
zed.close()
if __name__ == "__main__":
asyncio.run(main())