如何跨Dask分布式工作器共享大型只读对象

问题描述 投票:0回答:1

问题

我正在尝试通过apply()发送一个2GB的CPython只读对象(可以被腌制)来分配给分布式工作者。这最终会为进程/线程(14+ GB)消耗大量内存。

有没有办法只将对象加载到内存中并让工作者同时使用该对象?

关于这个问题的更多细节

我有2个Dask系列Source_list和Pattern_list,分别包含7百万和3百万个字符串。我正在尝试从Pattern_list(3M)中找到Source_list(7M)中的所有子字符串匹配项。

为了加快子字符串搜索,我使用pyahocorasick包从Pattern_list创建一个Cpython数据结构(一个类对象)(该对象是可选择的)。

我尝试过的事情

  1. 使用单个dask调度程序运行大约需要2.5小时来处理,但结束时会得到正确的结果。
  2. 运行与dask分布正常导致:
distributed.worker - WARNING - Memory use is high but worker has no data to 
store to disk. Perhaps some other process is leaking memory? Process memory:  
2.85 GB -- Worker memory limit: 3.00 GB
  1. 运行带有内存限制的dask增加到8GB / 16GB: 主题 distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 14.5 GB -- Worker memory limit: 16.00 GB distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting 流程需要超过2.5小时的处理时间,我从未见过它完成(在取消前将其运行8小时以上)。它还消耗10 GB以上的内存
  2. 使用矢量化字符串操作Source_list.str.find_all(Pattern_list)需要超过2.5小时。
  3. 将对象存储在全局变量中并调用它会导致与进程和线程的第3点相同的错误。
  4. 在Source_list上使用map_partitions + loop / map可得到与第3点相同的结果。

Dask分布式代码

# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1

import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress

def create_ahocorasick_trie(pattern_list):
    A = ahocorasick.Automaton()
    for index, item in pattern_list.iteritems():
         A.add_word(item,item)
    A.make_automaton()
    return A 

if __name__ == '__main__':
    client = Client(memory_limit="12GB",processes=False)

    # Using Threading, because, the large_object seems to get copied in memory 
    # for each process when processes = True

    Source_list = dd.read_parquet("source_list.parquet") 
    Pattern_list = dd.read_parquet("pattern_list.parquet")

    # Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask

    large_object = create_ahocorasick_trie(Pattern_list)

    result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))

    # iter() is an ahocorasick Cpython method

    progress(result.head(10))

    client.close()




python python-multiprocessing dask concurrent.futures dask-distributed
1个回答
0
投票

简短的回答是将它包装在一个dask.delayed调用中

big = dask.delayed(big)
df.apply(func, extra=big)

Dask会根据需要移动它并将其视为自己的数据。话虽如此,它需要存在于每个工作人员身上,因此每个工作人员的RAM应该远远超过该工作人员所占用的RAM。 (至少4倍左右)。

© www.soinside.com 2019 - 2024. All rights reserved.