提交DASK阵列分布式客户端同时使用效果的同时

问题描述 投票:0回答:1

我有一个表示视频的帧DASK阵列以及希望创建多个视频文件。我使用的imageio库,让我到帧“追加”至FFmpeg子。所以,我可能有这样的事情:

my_frames = [[arr1f1, arr1f2, arr1f3], [arr2f1, arr2f2, arr2f3], ...]

因此,每个内部列表表示一个视频(或产品)的帧。我正在寻找发送/提交帧的最佳途径,同时也写入帧imageio,因为他们完成(按顺序)来计算。为了使问题更加复杂高于内部列表实际上是发电机,可以是100秒或帧的1000。也请记住,因为imageio是如何工作的,我认为它需要在一个单一的过程中存在。这是我到目前为止工作的简化版本:

for frame_arrays in frames_to_write:
    # 'frame_arrays' is [arr1f1, arr2f1, arr3f1, ...]
    future_list = _client.compute(frame_arrays)
    # key -> future
    future_dict = dict(zip(frame_keys, future_list))

    # write the current frame
    # future -> key
    rev_future_dict = {v: k for k, v in future_dict.items()}
    result_iter = as_completed(future_dict.values(), with_results=True)
    for future, result in result_iter:
        frame_key = rev_future_dict[future]
        # get the writer for this specific video and add a new frame
        w = writers[frame_key]
        w.append_data(result)

这工作和我的实际代码从上面的重组,而写当前帧所以有一些好处,我认为提交下一帧。我想如果用户说:“我要处理X展架在一个时间”,所以我送50帧,写50帧,发送50个帧,写50帧等的溶液

这方面的工作了一段时间后,我的问题:

  1. 什么时候result的数据住在本地内存?当迭代器返回或结束的时候吗?
  2. 是否有可能做这样的事情与DASK内核线程调度器,因此用户不必已分发安装?
  3. 是否有可能适应多少帧基础上的工人数量被发送?
  4. 有没有送DASK阵列和/或使用与“frame_key” as_completed的字典所包含的方法吗?
  5. 如果我加载整个帧序列,并提交给了客户机/集群,我可能会杀了调度权?
  6. 使用get_client()其次Client()上获得客户(如果不是由用户提供)的ValueError的首选方式?
  7. 是否有可能给DASK /分布一个或多个迭代器,它从拉工人变得可用?
  8. 我是不是哑巴?这过于复杂?

注:这是一种扩展到this issue,我前一阵子做的,但略有不同。

python dask dask-distributed
1个回答
0
投票

下面有很多here我得到了以下的例子中后:

    try:
        # python 3
        from queue import Queue
    except ImportError:
        # python 2
        from Queue import Queue
    from threading import Thread

    def load_data(frame_gen, q):
        for frame_arrays in frame_gen:
            future_list = client.compute(frame_arrays)
            for frame_key, arr_future in zip(frame_keys, future_list):
                q.put({frame_key: arr_future})
        q.put(None)

    input_q = Queue(batch_size if batch_size is not None else 1)
    load_thread = Thread(target=load_data, args=(frames_to_write, input_q,))
    remote_q = client.gather(input_q)
    load_thread.start()

    while True:
        future_dict = remote_q.get()
        if future_dict is None:
            break

        # write the current frame
        # this should only be one element in the dictionary, but this is
        # also the easiest way to get access to the data
        for frame_key, result in future_dict.items():
            # frame_key = rev_future_dict[future]
            w = writers[frame_key]
            w.append_data(result)
        input_q.task_done()

    load_thread.join()

这回答我的大部分问题,我已经和似乎工作我想一般的方式。

© www.soinside.com 2019 - 2024. All rights reserved.