如何使用多重处理从python中的数据帧创建gzip文件

问题描述 投票:0回答:1

我有一个正在变得IO约束的过程,在该过程中,我将数据库中的大型数据集提取到pandas数据框中,然后尝试逐行进行一些处理,然后坚持到gzip文件。我正在尝试找到一种使用多重处理的方法,以便能够将gzip的创建分成多个进程,然后将它们合并到一个文件中。或者并行处理而不覆盖先前的线程。我找到了此软件包p_tqdm,但是我遇到了EOF问题,可能是因为线程相互覆盖了。这是我当前解决方案的示例:

from p_tqdm import p_map

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)    
p_map(process, things)

def process():
    with gzip.open("final.gz", "wb") as f:
        value = do_somthing(row)
        f.write(value.encode())
arrays python-3.x multithreading
1个回答
0
投票

我不知道p_tqdm,但是如果我理解您的问题,可以很容易地使用multiprocessing完成。

类似的东西

import multiprocessing

def process(row):
    # take care that "do_somthing" must return class with encode() method (e.g. string)
    return do_somthing(row)

df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
    things.append(row)


with gzip.open("final.gz", "wb") as f, multiprocessing.Pool() as pool:
    for processed_row in pool.imap(process, things):
        f.write(processed_row.encode())

仅几条旁注:

  • 熊猫iterrows方法很慢-尽可能避免(请参阅Does pandas iterrows have performance issues?)。

  • 此外,您无需创建things,只需将可迭代传递给imap(即使直接传递df.iterrows()也应是可能的),为您节省一些内存。

  • 最后,由于似乎您正在读取sql数据,为什么不直接连接到db并遍历SELECT ...查询中的游标,完全跳过pandas

© www.soinside.com 2019 - 2024. All rights reserved.