Python高效处理~5000万条记录文件

问题描述 投票:0回答:4

我们有一个包含大约 4600 万条 CSV 格式记录的文件。每条记录大约有 18 个字段,其中一个是 64 字节的 ID。我们还有另一个文件,其中包含大约 167K 个唯一 ID。 ID对应的记录需要被抽出。因此,我们编写了一个 python 程序,将 167K ID 读入数组并处理 4600 万条记录文件,检查这些记录中是否存在 ID。这是代码片段:

import csv
...
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

在一小组数据上测试程序,这里是处理时间:

lines: 117929 process time: 236.388447046 sec
lines: 145390 process time: 277.075321913 sec

我们已于昨晚 ~3:00am EST 开始在 4600 万条记录文件(约 13 GB 大小)上运行该程序,现在是美国东部时间上午 10 点左右,它仍在处理中!

问题:

  1. 是否有更好的方法来处理这些记录以缩短处理时间?
  2. python 是正确的选择吗? awk 或其他工具会有帮助吗?
  3. 我猜测在以下语句中对 167K 数组进行 64 字节 ID 查找是罪魁祸首:
     if fieldAry[CUSTOMER_ID] not in idArray: 

有更好的选择吗?

谢谢!

更新:这是在带有 EBS 附加卷的 EC2 实例上处理的。

python arrays csv lookup latency
4个回答
12
投票

应该必须使用

set
而不是
list
before
for
循环做:

idArray = set(idArray)

csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

看到令人难以置信的加速;你正在使用days的不必要的处理时间只是因为你选择了错误的数据结构。


具有

in

set
运算符具有 O(1) 时间复杂度,而具有 list
O(n)
时间复杂度。这可能听起来“没什么大不了的”,但实际上这是您脚本中的瓶颈。即使
set
对于那个 O 会有更高的常数。因此,您的代码在单个
in
操作上使用的时间比必要的多 30000 次。如果在最佳版本中它需要 30 秒,现在您仅在这条线上就花费了 10 天。

请看下面的测试:我生成了 100 万个 ID 并将 10000 个放到另一个列表中 -

to_remove
。然后我像你一样做一个
for
循环,为每条记录做
in
操作:

import random
import timeit

all_ids = [random.randint(1, 2**63) for i in range(1000000)]
to_remove = all_ids[:10000]
random.shuffle(to_remove)
random.shuffle(all_ids)


def test_set():
    to_remove_set = set(to_remove)
    for i in all_ids:
        if i in to_remove_set:
            pass

def test_list():
    for i in all_ids:
        if i in to_remove:
            pass


print('starting')
print('testing list', timeit.timeit(test_list, number=1))
print('testing set', timeit.timeit(test_set, number=1))

结果:

testing list 227.91903045598883
testing set 0.14897623099386692

set
版本耗时149毫秒;
list
版本需要 228 秒。现在这些都是小数字:在你的例子中,你有 5000 万条输入记录,而我有 100 万条;因此,您需要将
testing set
时间乘以 50:对于您的数据集,大约需要 7.5 秒。

另一方面,列表版本需要将时间乘以 50 * 17 - 不仅输入记录多 50 倍,而且要匹配的记录多 17 倍。因此我们得到 227 * 50 * 17 = 192950.

所以你的算法花了 2.2 天的时间来做一些使用正确的数据结构可以在 7.5 秒内完成的事情。当然,这并不意味着您可以在 7.5 秒内扫描整个 50 GB 的文档,但它可能不会超过 2.2 天。所以我们从:

             2 days                           2.2 days 
 |reading and writing the files||------- doing id in list ------|

类似

             2 days            7.5 seconds (doing id in set)
 |reading and writing the files||

0
投票

加快速度的最简单方法是使用某些分布式解决方案并行处理线路。最简单的方法之一是使用 multiprocessing.Pool。你应该做这样的事情(语法未检查):

from multiprocessing import Pool

p = Pool(processes=4)
p.map(process_row, csvReadHandler)

尽管如此,python 并不是进行这种批处理的最佳语言(主要是因为写入磁盘非常慢)。最好将所有磁盘写入管理(缓冲、排队等)留给 linux 内核,因此使用 bash 解决方案会更好。最有效的方法是将输入文件分成块,然后简单地执行反向 grep 来过滤 ids。

for file in $list_of_splitted_files; then
  cat $file | grep -v (id1|id2|id3|...) > $file.out
done;

如果您之后需要简单地合并:

for file in $(ls *.out); then
  cat $file >> final_results.csv
done

注意事项:

  • 不知道对所有 id 进行一次 grep 是否更多/更少 比遍历所有 id 并执行单个 id 更有效 grep.
  • 编写并行解决方案时,尝试读/写不同的 文件以最小化 I/O 瓶颈(所有线程试图写入 同一个文件)在所有语言中。
  • 为每个处理部分在代码中放置计时器。这样你就会看到哪个部分浪费了更多时间。我真的推荐这个,因为我有一个类似的程序要编写,我认为处理部分是瓶颈(类似于您与 ids 向量的比较)但实际上是 I/O 拖累了所有执行。

0
投票

我认为最好使用数据库来解决这个问题 首先创建一个数据库,比如 MySql 或其他任何东西 然后将文件中的数据写入 2 表 最后使用一个简单的 sql 查询来选择行 像这样的东西: select * from table1 where id in(select ids from table2)


0
投票

最好是使用大数据方法。类似于 AWS EMR 上的 PySpark,它可以对数据进行批处理。一个体面的集群需要大约 1-2 小时来处理 700 万条记录。

© www.soinside.com 2019 - 2024. All rights reserved.