我们有一个包含大约 4600 万条 CSV 格式记录的文件。每条记录大约有 18 个字段,其中一个是 64 字节的 ID。我们还有另一个文件,其中包含大约 167K 个唯一 ID。 ID对应的记录需要被抽出。因此,我们编写了一个 python 程序,将 167K ID 读入数组并处理 4600 万条记录文件,检查这些记录中是否存在 ID。这是代码片段:
import csv
...
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
lineCounts['orig'] += 1
if fieldAry[CUSTOMER_ID] not in idArray:
csvWriteHandler.writerow(fieldAry)
lineCounts['mod'] += 1
在一小组数据上测试程序,这里是处理时间:
lines: 117929 process time: 236.388447046 sec
lines: 145390 process time: 277.075321913 sec
我们已于昨晚 ~3:00am EST 开始在 4600 万条记录文件(约 13 GB 大小)上运行该程序,现在是美国东部时间上午 10 点左右,它仍在处理中!
问题:
if fieldAry[CUSTOMER_ID] not in idArray:
有更好的选择吗?
谢谢!
更新:这是在带有 EBS 附加卷的 EC2 实例上处理的。
你应该必须使用
set
而不是list
; before for
循环做:
idArray = set(idArray)
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
lineCounts['orig'] += 1
if fieldAry[CUSTOMER_ID] not in idArray:
csvWriteHandler.writerow(fieldAry)
lineCounts['mod'] += 1
看到令人难以置信的加速;你正在使用days的不必要的处理时间只是因为你选择了错误的数据结构。
in
的
set
运算符具有 O(1) 时间复杂度,而具有 list
的 O(n)时间复杂度。这可能听起来“没什么大不了的”,但实际上这是您脚本中的瓶颈。即使
set
对于那个 O 会有更高的常数。因此,您的代码在单个 in
操作上使用的时间比必要的多 30000 次。如果在最佳版本中它需要 30 秒,现在您仅在这条线上就花费了 10 天。
请看下面的测试:我生成了 100 万个 ID 并将 10000 个放到另一个列表中 -
to_remove
。然后我像你一样做一个for
循环,为每条记录做in
操作:
import random
import timeit
all_ids = [random.randint(1, 2**63) for i in range(1000000)]
to_remove = all_ids[:10000]
random.shuffle(to_remove)
random.shuffle(all_ids)
def test_set():
to_remove_set = set(to_remove)
for i in all_ids:
if i in to_remove_set:
pass
def test_list():
for i in all_ids:
if i in to_remove:
pass
print('starting')
print('testing list', timeit.timeit(test_list, number=1))
print('testing set', timeit.timeit(test_set, number=1))
结果:
testing list 227.91903045598883
testing set 0.14897623099386692
set
版本耗时149毫秒; list
版本需要 228 秒。现在这些都是小数字:在你的例子中,你有 5000 万条输入记录,而我有 100 万条;因此,您需要将 testing set
时间乘以 50:对于您的数据集,大约需要 7.5 秒。
另一方面,列表版本需要将时间乘以 50 * 17 - 不仅输入记录多 50 倍,而且要匹配的记录多 17 倍。因此我们得到 227 * 50 * 17 = 192950.
所以你的算法花了 2.2 天的时间来做一些使用正确的数据结构可以在 7.5 秒内完成的事情。当然,这并不意味着您可以在 7.5 秒内扫描整个 50 GB 的文档,但它可能不会超过 2.2 天。所以我们从:
2 days 2.2 days
|reading and writing the files||------- doing id in list ------|
类似
2 days 7.5 seconds (doing id in set)
|reading and writing the files||
加快速度的最简单方法是使用某些分布式解决方案并行处理线路。最简单的方法之一是使用 multiprocessing.Pool。你应该做这样的事情(语法未检查):
from multiprocessing import Pool
p = Pool(processes=4)
p.map(process_row, csvReadHandler)
尽管如此,python 并不是进行这种批处理的最佳语言(主要是因为写入磁盘非常慢)。最好将所有磁盘写入管理(缓冲、排队等)留给 linux 内核,因此使用 bash 解决方案会更好。最有效的方法是将输入文件分成块,然后简单地执行反向 grep 来过滤 ids。
for file in $list_of_splitted_files; then
cat $file | grep -v (id1|id2|id3|...) > $file.out
done;
如果您之后需要简单地合并:
for file in $(ls *.out); then
cat $file >> final_results.csv
done
注意事项:
我认为最好使用数据库来解决这个问题 首先创建一个数据库,比如 MySql 或其他任何东西 然后将文件中的数据写入 2 表 最后使用一个简单的 sql 查询来选择行 像这样的东西: select * from table1 where id in(select ids from table2)
最好是使用大数据方法。类似于 AWS EMR 上的 PySpark,它可以对数据进行批处理。一个体面的集群需要大约 1-2 小时来处理 700 万条记录。