在一个dask包上依次迭代

问题描述 投票:1回答:1

我需要将非常大的dask.bag元素提交给非线程安全商店,即我需要类似的东西

for x in dbag:
    store.add(x)

我不能使用compute,因为袋子很大,以适应记忆。我需要更像distributed.as_completed的东西,但它适用于袋子,distributed.as_completed没有。

dask concurrent.futures dask-distributed
1个回答
1
投票

我可能会继续使用普通计算,但添加一个锁

def commit(x, lock=None):
    with lock:
        store.add(x)

b.map(commit, lock=my_lock)

根据您正在进行的处理类型,您可以在哪里创建threading.Lockmultiprocessing.Lock

如果你想使用as_completed,你可以将你的行李转换为期货并使用as_completed。

from distributed.client import futures_of, as_completed
b = b.persist()
futures = futures_of(b)

for future in as_completed(futures):
    for x in future.result():
        store.add(x)

您还可以转换为数据帧,我相信它会更明智地迭代

df = b.to_dataframe(...)
for x in df.iteritems(...):
    ...
© www.soinside.com 2019 - 2024. All rights reserved.