我正在编写一段代码,一次获取一个巨大的文本文件(几 GB)N 行,处理该批次,然后移至接下来的 N 行,直到完成整个文件。 (我不在乎最后一批的尺寸是否不完美)。
我一直在阅读有关使用 itertools islice 进行此操作的信息。我想我已经成功了一半:
from itertools import islice
N = 16
infile = open("my_very_large_text_file", "r")
lines_gen = islice(infile, N)
for lines in lines_gen:
...process my lines...
问题是我想处理下一批 16 行,但我遗漏了一些东西
islice()
可用于获取迭代器的下一个 n
项。因此,list(islice(f, n))
将返回文件n
接下来的f
行的列表。在循环中使用它将为您提供 n
行块中的文件。在文件末尾,列表可能会更短,最后调用将返回一个空列表。
from itertools import islice
with open(...) as f:
while True:
next_n_lines = list(islice(f, n))
if not next_n_lines:
break
# process next_n_lines
另一种方法是使用 grouper 模式:
from itertools import zip_longest
with open(...) as f:
for next_n_lines in zip_longest(*[f] * n):
# process next_n_lines
这个问题似乎假设通过一次读取 N 行块中的“巨大文本文件”可以提高效率。这在已经高度优化的
stdio
库上增加了一个应用程序缓冲层,增加了复杂性,并且可能绝对不会给你带来任何好处。
因此:
with open('my_very_large_text_file') as f:
for line in f:
process(line)
在时间、空间、复杂性和可读性方面可能优于任何替代方案。
另请参阅Rob Pike 的前两条规则、Jackson 的两条规则和PEP-20 Python 之禅。如果你真的只是想玩
islice
你应该忽略大文件。
这是使用 groupby 的另一种方法:
from itertools import count, groupby
N = 16
with open('test') as f:
for g, group in groupby(f, key=lambda _, c=count(): c.next()/N):
print list(group)
工作原理:
基本上 groupby() 将根据关键参数的返回值对行进行分组,关键参数是 lambda 函数
lambda _, c=count(): c.next()/N
并使用 c 参数将在 时绑定到 count() 的事实函数将被定义,因此每次 groupby()
都会调用 lambda 函数并评估返回值以确定对行进行分组的分组器:
# 1 iteration.
c.next() => 0
0 / 16 => 0
# 2 iteration.
c.next() => 1
1 / 16 => 0
...
# Start of the second grouper.
c.next() => 16
16/16 => 1
...
由于添加了从文件中选择的行具有统计均匀分布的要求,因此我提供了这种简单的方法。
"""randsamp - extract a random subset of n lines from a large file"""
import random
def scan_linepos(path):
"""return a list of seek offsets of the beginning of each line"""
linepos = []
offset = 0
with open(path) as inf:
# WARNING: CPython 2.7 file.tell() is not accurate on file.next()
for line in inf:
linepos.append(offset)
offset += len(line)
return linepos
def sample_lines(path, linepos, nsamp):
"""return nsamp lines from path where line offsets are in linepos"""
offsets = random.sample(linepos, nsamp)
offsets.sort() # this may make file reads more efficient
lines = []
with open(path) as inf:
for offset in offsets:
inf.seek(offset)
lines.append(inf.readline())
return lines
dataset = 'big_data.txt'
nsamp = 5
linepos = scan_linepos(dataset) # the scan only need be done once
lines = sample_lines(dataset, linepos, nsamp)
print 'selecting %d lines from a file of %d' % (nsamp, len(linepos))
print ''.join(lines)
我在磁盘上包含 1.7GB 的 300 万行模拟数据文件上对其进行了测试。在我不太热的桌面上,
scan_linepos
占据了运行时间大约 20 秒。
只是为了检查
sample_lines
的性能,我照此使用了 timeit
模块
import timeit
t = timeit.Timer('sample_lines(dataset, linepos, nsamp)',
'from __main__ import sample_lines, dataset, linepos, nsamp')
trials = 10 ** 4
elapsed = t.timeit(number=trials)
print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000,
elapsed, (elapsed/trials) * (10 ** 6))
对于
nsamp
的各种值;当 nsamp
为 100 时,单个 sample_lines
在 460μs 内完成,并以每次调用 47ms 的速度线性扩展至 10k 个样本。
自然的下一个问题是随机根本不是随机的?,答案是“亚加密,但对于生物信息学来说肯定没问题”。
使用了来自 What is the most “pythonic” way to iterate over a list in chunks?:
的 chunker 函数from itertools import izip_longest
def grouper(iterable, n, fillvalue=None):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return izip_longest(*args, fillvalue=fillvalue)
with open(filename) as f:
for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk
"""process lines like
lines[0], lines[1] , ... , lines[chunk_size-1]"""
假设“batch”意味着要一次处理所有 16 个记录而不是单独处理,一次读取文件中的一条记录并更新计数器;当计数器达到 16 时,处理该组。
interim_list = [] infile = open("my_very_large_text_file", "r") ctr = 0 for rec in infile: interim_list.append(rec) ctr += 1 if ctr > 15: process_list(interim_list) interim_list = [] ctr = 0
the final group
process_list(interim_list)
另一种解决方案可能是创建一个迭代器来生成 n 个元素的列表:
def n_elements(n, it):
try:
while True:
yield [next(it) for j in range(0, n)]
except StopIteration:
return
with open(filename, 'rt') as f:
for n_lines in n_elements(n, f):
do_stuff(n_lines)
使用团块:
from dollop.file import serve
file_name = 'my_very_large_text_file'
for lines in serve(file_name, N, mode='lines'):
process_lines(lines)