具有多个参数的 Dask 映射方法

问题描述 投票:0回答:1

我想将

Client.map
方法应用于使用多个参数的函数,就像
Pool.starmap
multiprocessing
方法一样。这是一个例子

from contextlib import contextmanager

from dask.distributed import Client


@contextmanager
def dask_client(**kwargs):
    """some docs"""
    kwargs.setdefault("ip", "localhost:8786")
    client = Client(**kwargs)

    try:
        yield client
    except Exception:
        raise
    finally:
        client.close()


def f(x,y,z):
    return x+y+z
# Dummy function

if __name__ == "__main__":
    with dask_client() as client:
        client.map(f, (1,2,3), (1,2,3))

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (1, 1)
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (2, 2)
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")

distributed.worker - WARNING - Compute Failed
Function:  f
args:      (3, 3)
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'z'")

这是已接受的答案这里

我知道每个元组都被视为我的函数

x
f
。如果可能的话我不想要这样的解决方案

def f(var_list):
    # could be sum(), but this is a dummy example
    return var_list[0] + var_list[1] + var_list[2]

python python-3.x dask dask-distributed
1个回答
4
投票

你已经很接近了,请注意,迭代器的数量应该与函数中的参数相同:

from dask.distributed import Client
client = Client()

def f(x,y,z):
    return x+y+z

futs = client.map(f, *[(1,2,3), (4,5,6), (7,8,9)])

client.gather(futs) # [12, 15, 18]

从评论看来你想将所有参数存储在一个元组中,在这种情况下你可以这样做:

# will pass them as x=1, y=2, z=3
long_list = [(1,2,3), (4,5,6), (7,8,9), (10,11,12)]

futs = client.map(f, *zip(*long_list))

client.gather(futs) # [6, 15, 24, 33]
© www.soinside.com 2019 - 2024. All rights reserved.