Python 中的并行化以获取多个流

问题描述 投票:0回答:1

我有闭路电视摄像机流的 URL 列表。我想流式传输它们,读取它们的帧,然后在读取它们时将它们保存在文件夹中。我编写了以下代码,效果非常好:

import cv2
import numpy as np
import os
import datetime
import threading

CAMERAS_TXT_FILE_NAME = "generation/videos.txt"
CAMERAS_FOLDER_PATH_NAME = "generation/cameras"

class camThread(threading.Thread):
    def __init__(self, name, url):
        threading.Thread.__init__(self)
        self.name = name
        self.url = url
    def run(self):
        print("Starting " + self.name)
        camPreview(self.name, self.url)

def camPreview(name, url):
    cam = cv2.VideoCapture(url)
    while(cam.isOpened()):
        ret, frame = cam.read()
        if ret == False:
            break
        cv2.imwrite(CAMERAS_FOLDER_PATH_NAME + "/" + name + "/" + str(datetime.datetime.now().time())+'.jpg',frame)  

#CV2 setup
cv2.CAP_PROP_BUFFERSIZE 

#Video Links Setup
with open(CAMERAS_TXT_FILE_NAME) as f:
    lines = f.readlines()

if (not os.path.isdir(CAMERAS_FOLDER_PATH_NAME)):
        os.makedirs(CAMERAS_FOLDER_PATH_NAME)

for i in range(len(lines)):
    url = lines[i].strip()
    camName = url.split('/')[-2].split('.')[0]
    CAMERA_PATH = CAMERAS_FOLDER_PATH_NAME + "/" + camName
    if (not os.path.isdir(CAMERA_PATH)):
        os.makedirs(CAMERA_PATH)
    cam = camThread(camName, url)
    cam.start()

但是,我意识到,当流数量增加时,线程会继续下载同一相机的帧,但甚至不会开始下载其他相机。我希望他们保存尽可能少的帧数,然后继续下一帧,以便大多数摄像机都被下载,而不是少数摄像机接近实时。

也许线程不是执行此操作的方法。我是否应该对列表进行多重处理,以便他们获取一个数字并使用给定的相机?使用多重处理的优点和缺点是什么,我对这些概念相对较新,但据我了解,多重处理更适合我的情况,因为我不需要进程共享内存。

编辑:我研究了更多一点,我相信最有效的方法是将相机划分在多个处理器之间,然后由于IO等待时间而在它们上使用线程。

python multiprocessing video-streaming
1个回答
0
投票

很高兴您已经发现了当前线程实现的问题,并且您正在考虑使用多处理。就您而言,多重处理确实更适合,因为它可以更好地并行执行 I/O 密集型任务,例如从不同相机流下载帧。

以下是使用多处理的一些优点和缺点:

优点:

  1. 并行执行:多处理允许真正的并行执行,因为每个进程都在自己的地址空间中运行,这对于 I/O 密集型任务(例如从不同相机下载帧)非常有利。

  2. 隔离性:每个进程都有自己的内存空间,因此无需担心共享数据结构或同步问题。这可以简化设计并使代码更加健壮。

  3. 利用多个核心: 多处理可以利用多个 CPU 核心,从而在多核系统上实现更好的整体性能。

缺点:

  1. 通信开销:进程之间的通信可能会带来一些开销。在进程之间传递数据需要序列化和反序列化,这可能会很昂贵。

  2. 资源消耗:每个进程都有自己的内存空间,创建过多的进程会导致内存消耗增加。根据可用资源管理进程数量非常重要。

现在,根据您的具体情况,您可以修改代码以使用多重处理。以下是如何构建它的示例:

import cv2
import numpy as np
import os
import datetime
from multiprocessing import Process

CAMERAS_TXT_FILE_NAME = "generation/videos.txt"
CAMERAS_FOLDER_PATH_NAME = "generation/cameras"

def camPreview(name, url):
    cam = cv2.VideoCapture(url)
    while(cam.isOpened()):
        ret, frame = cam.read()
        if ret == False:
            break
        cv2.imwrite(CAMERAS_FOLDER_PATH_NAME + "/" + name + "/" + str(datetime.datetime.now().time()) + '.jpg', frame)

def process_camera(url):
    camName = url.split('/')[-2].split('.')[0]
    CAMERA_PATH = CAMERAS_FOLDER_PATH_NAME + "/" + camName
    if not os.path.isdir(CAMERA_PATH):
        os.makedirs(CAMERA_PATH)
    camPreview(camName, url)

if __name__ == '__main__':
    # Video Links Setup
    with open(CAMERAS_TXT_FILE_NAME) as f:
        lines = f.readlines()

    if not os.path.isdir(CAMERAS_FOLDER_PATH_NAME):
        os.makedirs(CAMERAS_FOLDER_PATH_NAME)

    processes = []

    for i in range(len(lines)):
        url = lines[i].strip()
        process = Process(target=process_camera, args=(url,))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

此代码使用

multiprocessing.Process
类为每个摄像头流创建单独的进程。每个进程都运行
process_camera
函数,该函数又为该特定摄像机流运行
camPreview
函数。

© www.soinside.com 2019 - 2024. All rights reserved.