我想将
Client.map
方法应用于使用多个参数的函数,就像 Pool.starmap
的 multiprocessing
方法一样。这是一个例子
from contextlib import contextmanager
from dask.distributed import Client
@contextmanager
def dask_client(**kwargs):
"""some docs"""
kwargs.setdefault("ip", "localhost:8786")
client = Client(**kwargs)
try:
yield client
except Exception:
raise
finally:
client.close()
def f(x,y,z):
return x+y+z
# Dummy function
if __name__ == "__main__":
with dask_client() as client:
client.map(f, (1,2,3), (1,2,3))
distributed.worker - WARNING - Compute Failed
Function: f
args: (1, 1)
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")
distributed.worker - WARNING - Compute Failed
Function: f
args: (2, 2)
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")
distributed.worker - WARNING - Compute Failed
Function: f
args: (3, 3)
kwargs: {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")
这是已接受的答案这里
我知道每个元组都被视为我的函数
x
的 f
。如果可能的话我不想要这样的解决方案
def f(var_list):
# could be sum(), but this is a dummy example
return var_list[0] + var_list[1] + var_list[2]
你已经很接近了,请注意,迭代器的数量应该与函数中的参数相同:
from dask.distributed import Client
client = Client()
def f(x,y,z):
return x+y+z
futs = client.map(f, *[(1,2,3), (4,5,6), (7,8,9)])
client.gather(futs) # [12, 15, 18]
从评论看来你想将所有参数存储在一个元组中,在这种情况下你可以这样做:
# will pass them as x=1, y=2, z=3
long_list = [(1,2,3), (4,5,6), (7,8,9), (10,11,12)]
futs = client.map(f, *zip(*long_list))
client.gather(futs) # [6, 15, 24, 33]