我正在使用 Python 处理一个大型 CSV 文件(超过 10M 行),对每一行运行一个计算操作,然后将结果保存在一个新的 CSV 文件中。我尝试在 4 个进程的池中运行该程序(我的服务器上的 CPU 内核数量相同),它在前 15 分钟保持在 100% 时运行良好,但随后它下降到 60% 以提醒执行。
下图展示了这个过程。每个输入文件大约运行 70 分钟,但它仅在 15 分钟内达到 100% 的 CPU 使用率峰值。我该怎么做才能加快速度并将其保持在 100%?
代码如下。资源繁重的功能是
doBigCalculation
(计算并保存到另一个CSV),所以我将其中的2000个请求保存在一个列表中,然后将它们发送到池中进行处理。
p = Pool(cpu_count())
with open('input.csv', newline='') as inputFile:
reader = csv.reader(inputFile, delimiter=',', quotechar='"')
for row in reader:
param1 = float(row[0])
param2 = float(row[1])
# This is the resource heavy task
aqiRequest = AQIRequest(param1, param2)
aqiRequests.append(aqiRequest)
if(len(aqiRequests) % 2000 == 0):
p.map(doBigCalculation, aqiRequests)
aqiRequests = []
像这样的任务的表现通常是一点点的跟踪、错误和分析。但第一个建议是不要自己分块请求。
map
具有内置的分块功能,可以更有效地执行此操作,您可以使用 chunksize=
参数对其进行调整。
还使用imap
允许您将读取重构为生成器,它与imap
的组合将延迟读取 csv,这应该减少 7000 万行的 ram 压力。重构代码如下。
from multiprocessing import Pool, cpu_count
def aqi_request_generator():
with open('input.csv', newline='') as inputFile:
reader = csv.reader(inputFile, delimiter=',', quotechar='"')
for row in reader:
param1 = float(row[0])
param2 = float(row[1])
# This is the resource heavy task
yield AQIRequest(param1, param2)
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
pool_size = cpu_count()
p = Pool(pool_size)
for result in p.imap(doBigCalculation,
aqi_request_generator(),
chunksize=compute_chunksize(10_000_000, pool_size)
):
# Do something with return value from doBugCalculation:
...
此外,如果有一些 IO(您提到写入输出 csv),您可能会发现通过拥有比 CPU 更多的进程可以获得更好的性能,因为您可能会花费大量时间来写入结果。 (这或许可以解释 CPU 性能出现峰值之后出现下降)