多进程 Pandas Group 在进程启动后无法执行任何操作

问题描述 投票:0回答:1

我有一个相当大的 pandas 数据框,大约有 600,000 行 x 50 列。我想执行 groupby.agg(custom_function) 来获取结果数据。 custom_function 是一个函数,它获取系列中非空数据的第一个值,或者如果系列中的所有值都为 null,则返回 null。 (我的数据框按数据质量分层排序,唯一键的第一次出现具有最准确的数据,但如果第一次出现空数据,我想在第二次出现时获取值......依此类推。 )

我发现基本的 groupby.agg(custom_function) 语法很慢,所以我实现了多重处理来加快计算速度。当此代码应用于大约 10,000 行长的数据帧时,计算需要几秒钟,但是,当我尝试使用整个数据时,该过程似乎停滞了。多个进程启动,但内存和 CPU 使用率保持不变,什么也没有完成。

这是代码的麻烦部分:

# Create list of individual dataframes to feed map/multiprocess function
grouped = combined.groupby(['ID'])
grouped_list = [group for name, group in grouped]
length = len(grouped)

# Multi-process execute single pivot function
print('\nMulti-Process Pivot:')
with concurrent.futures.ProcessPoolExecutor() as executor:
    with tqdm.tqdm(total=length) as progress:
        futures = []
        for df in grouped_list:
            future = executor.submit(custom_function, df)
            future.add_done_callback(lambda p: progress.update())
            futures.append(future)
        results = []
        for future in futures:
            result = future.result()
            results.append(result)

我认为这个问题与多重处理有关(也许排队这么大的作业就是问题所在?)。我不明白为什么相当小的作业不会给这段代码带来任何问题,但增加输入数据的大小似乎会挂起它,而不仅仅是执行得更慢。如果有一种更有效的方法来获取每个唯一 ID 的每列中的第一个值,我很想听听。

感谢您的帮助。

pandas multiprocessing concurrent.futures
1个回答
0
投票

我建议在调用 future.result() 之前插入这段代码:

如果 future.exception() 不是 None: 引发 future.Exception()

就我而言,存在酸洗问题,因此多处理调用从未完成。不幸的是 future.result() 不会引发任何异常,因此您必须自己检查它,就像上面的代码一样。

© www.soinside.com 2019 - 2024. All rights reserved.