我有一个表示视频的帧DASK阵列以及希望创建多个视频文件。我使用的imageio
库,让我到帧“追加”至FFmpeg子。所以,我可能有这样的事情:
my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]
因此,每个内部列表表示一个视频(或产品)的帧。我正在寻找发送/提交帧的最佳途径,同时也写入帧imageio
,因为他们完成(按顺序)来计算。为了使问题更加复杂高于内部列表实际上是发电机,可以是100秒或帧的1000。也请记住,因为imageio
是如何工作的,我认为它需要在一个单一的过程中存在。这是我到目前为止工作的简化版本:
for frame_arrays in frames_to_write:
# 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
future_list = _client.compute(frame_arrays)
# key -> future
future_dict = dict(zip(frame_keys, future_list))
# write the current frame
# future -> key
rev_future_dict = {v: k for k, v in future_dict.items()}
result_iter = as_completed(future_dict.values(), with_results=True)
for future, result in result_iter:
frame_key = rev_future_dict[future]
# get the writer for this specific video and add a new frame
w = writers[frame_key]
w.append_data(result)
这工作和我的实际代码从上面的重组,而写当前帧所以有一些好处,我认为提交下一帧。我想如果用户说:“我要处理X展架在一个时间”,所以我送50帧,写50帧,发送50个帧,写50帧等的溶液
这方面的工作了一段时间后,我的问题:
result
的数据住在本地内存?当迭代器返回或结束的时候吗?get_client()
其次Client()
上获得客户(如果不是由用户提供)的ValueError
的首选方式?注:这是一种扩展到this issue,我前一阵子做的,但略有不同。
下面有很多here我得到了以下的例子中后:
try:
# python 3
from queue import Queue
except ImportError:
# python 2
from Queue import Queue
from threading import Thread
def load_data(frame_gen, q):
for frame_arrays in frame_gen:
future_list = client.compute(frame_arrays)
for frame_key, arr_future in zip(frame_keys, future_list):
q.put({frame_key: arr_future})
q.put(None)
input_q = Queue(batch_size if batch_size is not None else 1)
load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
remote_q = client.gather(input_q)
load_thread.start()
while True:
future_dict = remote_q.get()
if future_dict is None:
break
# write the current frame
# this should only be one element in the dictionary, but this is
# also the easiest way to get access to the data
for frame_key, result in future_dict.items():
# frame_key = rev_future_dict[future]
w = writers[frame_key]
w.append_data(result)
input_q.task_done()
load_thread.join()
这回答我的大部分问题,我已经和似乎工作我想一般的方式。