我有一个正在变得IO约束的过程,在该过程中,我将数据库中的大型数据集提取到pandas数据框中,然后尝试逐行进行一些处理,然后坚持到gzip文件。我正在尝试找到一种使用多重处理的方法,以便能够将gzip的创建分成多个进程,然后将它们合并到一个文件中。或者并行处理而不覆盖先前的线程。我找到了此软件包p_tqdm,但是我遇到了EOF问题,可能是因为线程相互覆盖了。这是我当前解决方案的示例:
from p_tqdm import p_map
df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
things.append(row)
p_map(process, things)
def process():
with gzip.open("final.gz", "wb") as f:
value = do_somthing(row)
f.write(value.encode())
我不知道p_tqdm
,但是如果我理解您的问题,可以很容易地使用multiprocessing
完成。
类似的东西
import multiprocessing
def process(row):
# take care that "do_somthing" must return class with encode() method (e.g. string)
return do_somthing(row)
df = pd.read_sql(some_sql, engine)
things =[]
for index, row in df.iterrows():
things.append(row)
with gzip.open("final.gz", "wb") as f, multiprocessing.Pool() as pool:
for processed_row in pool.imap(process, things):
f.write(processed_row.encode())
仅几条旁注:
熊猫iterrows
方法很慢-尽可能避免(请参阅Does pandas iterrows have performance issues?)。
此外,您无需创建things
,只需将可迭代传递给imap
(即使直接传递df.iterrows()也应是可能的),为您节省一些内存。
最后,由于似乎您正在读取sql数据,为什么不直接连接到db并遍历SELECT ...
查询中的游标,完全跳过pandas
。