并行编程:同步进程

问题描述 投票:0回答:1

我有一个程序,有很多音乐甲板(甲板1,甲板2,music_clip_deck,speakers_deck,ip_call_1,ip_call_2,ip_call_3)。每个甲板都在单独的过程中工作。我用来裁剪 mp3 文件/重传流/来自麦克风的语音/来自 aiortc-pyav 的语音的块时间是 125 毫秒。之后,我填充一些队列(每个单独的进程一个),并将最终队列发送到最终线程进行最终音频处理,然后再收听并传输给客户端。

如何将所有进程同步在一起,以便每个进程的运行时间恰好需要 125 毫秒?

这是一张求助图:

这种方法可能根本没有帮助:

class Deck_1_Proc(Process):
...
...
...
    def run(self):
        while(True):
            t1 = time.time()
            ...
            ...
            ...
            t2 = time.time()
            if t2 - t1 < 0.125:
                time.sleep(0.125 - (t2 - t1))

也许更好的方法应该使用类似 javascript setInterval 的时间参数:125msec

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()
python parallel-processing multiprocessing synchronization
1个回答
0
投票

主要问题是大多数计时器都会漂移,并且

sleep
不准确甚至QTimer也不准确,因此稳定的计时器(从某种意义上说,第100个滴答接近12.5秒)必须执行类似的操作这个。

import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time.sleep(next_beat - time.time())
        with cv:
            cv.notify_all()

您可以使用 Condition Vairable 轻松同步所有进程以同时唤醒,但如果其中一个进程滞后几毫秒,您可能需要 multiprocessing.Value 来确保它们仅在滞后时等待后面如下:

import time
from multiprocessing import Condition, Value, Process
def infinite_heartbreat(cv: Condition, frame: Value):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time.sleep(next_beat - time.time())
        with cv:
            frame.value += 1
            cv.notify_all()

def worker(cv, frame_number, worker_id):
    current_frame = frame_number.value
    while True:
        with cv:
            if current_frame > frame_number.value:
                cv.wait()
                continue
        print(f"processed frame {current_frame} in worker {worker_id}")
        current_frame += 1

if __name__ == "__main__":
    condition = Condition()
    frame = Value('Q')
    processes = []
    for i in range(4):
        process = Process(target=worker, args=(condition,frame,i))
        process.start()
        processes.append(process)
    infinite_heartbreat(condition, frame)
© www.soinside.com 2019 - 2024. All rights reserved.