我希望在完成后为未来添加一个回调。
根据文件:
回调完成后调用将来的回调。
回调fn应该将未来作为唯一的参数。无论将来是成功完成,错误还是取消,都将调用此方法。
回调在单独的线程中执行。
这并没有为我提供我所需要的东西,因为要求回调fn将未来作为唯一的参数。
这是我要做的事情的示例部分代码:
def method(cu_device_id):
print("Hello world, I'm going to use GPU %i" % cu_device_id)
def callback_fn(cu_device_id)
gpu_queue.put(cu_device_id)
cu_device_id = gpu_queue.get()
future = client.submit(method, cu_device_id)
#gpu_queue.put(cu_device_id) # Does not work, clients will shortly end up piled onto the slowest GPU
result.add_done_callback(callback_fn) # Crash / no way to pass in cu_device_id
这里的想法是让客户端从队列中获取可用的GPU,然后一旦完成使用它,将其重新放入队列,以便其他客户端可以使用它。
解决这个问题的一种方法是将gpu_queue传递给客户端:
def method(gpu_queue):
cu_device_id = gpu_queue.get()
print("Hello world, I'm going to use GPU %i" % cu_device_id)
gpu_queue.put(cu_device_id)
future = client.submit(method, gpu_queue)
事情像这样按预期工作。但我更喜欢能够从外面做到这一点我错过了什么或者没有看到让这项工作成功?
谢谢
您也可以考虑使用as_completed
迭代器在客户端处理此问题
data = iter(data)
futures = []
using_gpu = {}
for i in range(n_gpus):
future = client.submit(process, next(data), use_gpu=i)
using_gpu[future] = i
seq = as_completed(futures)
for future in seq:
gpu = using_gpu.pop(future)
new = client.submit(process, next(data), use_gpu=gpu) # TODO: handle end of data sequence gracefully
using_gpu[new] = gpu
seq.add(new) # add this into the sequence