我有一个程序,有很多音乐甲板(甲板1,甲板2,music_clip_deck,speakers_deck,ip_call_1,ip_call_2,ip_call_3)。每个甲板都在单独的过程中工作。我用来裁剪 mp3 文件/重传流/来自麦克风的语音/来自 aiortc-pyav 的语音的块时间是 125 毫秒。之后,我填充一些队列(每个单独的进程一个),并将最终队列发送到最终线程进行最终音频处理,然后再收听并传输给客户端。
如何将所有进程同步在一起,以便每个进程的运行时间恰好需要 125 毫秒?
这是一张求助图:
这种方法可能根本没有帮助:
class Deck_1_Proc(Process):
...
...
...
def run(self):
while(True):
t1 = time.time()
...
...
...
t2 = time.time()
if t2 - t1 < 0.125:
time.sleep(0.125 - (t2 - t1))
也许更好的方法应该使用类似 javascript setInterval 的时间参数:125msec
from threading import Event, Thread
def call_repeatedly(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval): # the first call is in `interval` secs
func(*args)
Thread(target=loop).start()
return stopped.set
#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()
主要问题是大多数计时器都会漂移,并且
sleep
不准确,甚至QTimer也不准确,因此稳定的计时器(从某种意义上说,第100个滴答接近12.5秒)必须执行类似的操作这个。
import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
next_beat = time.time()
while True:
next_beat += 0.125
time.sleep(next_beat - time.time())
with cv:
cv.notify_all()
您可以使用 Condition Vairable 轻松同步所有进程以同时唤醒,但如果其中一个进程滞后几毫秒,您可能需要 multiprocessing.Value 来确保它们仅在滞后时等待后面如下:
import time
from multiprocessing import Condition, Value, Process
def infinite_heartbreat(cv: Condition, frame: Value):
next_beat = time.time()
while True:
next_beat += 0.125
time.sleep(next_beat - time.time())
with cv:
frame.value += 1
cv.notify_all()
def worker(cv, frame_number, worker_id):
current_frame = frame_number.value
while True:
with cv:
if current_frame > frame_number.value:
cv.wait()
continue
print(f"processed frame {current_frame} in worker {worker_id}")
current_frame += 1
if __name__ == "__main__":
condition = Condition()
frame = Value('Q')
processes = []
for i in range(4):
process = Process(target=worker, args=(condition,frame,i))
process.start()
processes.append(process)
infinite_heartbreat(condition, frame)