Python 如何一次读取 N 行

问题描述 投票:0回答:8

我正在编写一段代码,一次获取一个巨大的文本文件(几 GB)N 行,处理该批次,然后移至接下来的 N 行,直到完成整个文件。 (我不在乎最后一批的尺寸是否不完美)。

我一直在阅读有关使用 itertools islice 进行此操作的信息。我想我已经成功了一半:

from itertools import islice
N = 16
infile = open("my_very_large_text_file", "r")
lines_gen = islice(infile, N)

for lines in lines_gen:
     ...process my lines...

问题是我想处理下一批 16 行,但我遗漏了一些东西

python lines python-itertools
8个回答
81
投票

islice()
可用于获取迭代器的下一个
n
项。因此,
list(islice(f, n))
将返回文件
n
接下来的
f
行的列表。在循环中使用它将为您提供
n
行块中的文件。在文件末尾,列表可能会更短,最后调用将返回一个空列表。

from itertools import islice
with open(...) as f:
    while True:
        next_n_lines = list(islice(f, n))
        if not next_n_lines:
            break
        # process next_n_lines

另一种方法是使用 grouper 模式:

from itertools import zip_longest
with open(...) as f:
    for next_n_lines in zip_longest(*[f] * n):
        # process next_n_lines

11
投票

这个问题似乎假设通过一次读取 N 行块中的“巨大文本文件”可以提高效率。这在已经高度优化的

stdio
库上增加了一个应用程序缓冲层,增加了复杂性,并且可能绝对不会给你带来任何好处。

因此:

with open('my_very_large_text_file') as f:
    for line in f:
        process(line)

在时间、空间、复杂性和可读性方面可能优于任何替代方案。

另请参阅Rob Pike 的前两条规则Jackson 的两条规则PEP-20 Python 之禅。如果你真的只是想玩

islice
你应该忽略大文件。


3
投票

这是使用 groupby 的另一种方法:

from itertools import count, groupby

N = 16
with open('test') as f:
    for g, group in groupby(f, key=lambda _, c=count(): c.next()/N):
        print list(group)

工作原理:

基本上 groupby() 将根据关键参数的返回值对行进行分组,关键参数是 lambda 函数

lambda _, c=count(): c.next()/N
并使用 c 参数将在 时绑定到 count() 的事实函数将被定义,因此每次
groupby()
都会调用 lambda 函数并评估返回值以确定对行进行分组的分组器:

# 1 iteration.
c.next() => 0
0 / 16 => 0
# 2 iteration.
c.next() => 1
1 / 16 => 0
...
# Start of the second grouper.
c.next() => 16
16/16 => 1   
...

2
投票

由于添加了从文件中选择的行具有统计均匀分布的要求,因此我提供了这种简单的方法。

"""randsamp - extract a random subset of n lines from a large file"""

import random

def scan_linepos(path):
    """return a list of seek offsets of the beginning of each line"""
    linepos = []
    offset = 0
    with open(path) as inf:     
        # WARNING: CPython 2.7 file.tell() is not accurate on file.next()
        for line in inf:
            linepos.append(offset)
            offset += len(line)
    return linepos

def sample_lines(path, linepos, nsamp):
    """return nsamp lines from path where line offsets are in linepos"""
    offsets = random.sample(linepos, nsamp)
    offsets.sort()  # this may make file reads more efficient

    lines = []
    with open(path) as inf:
        for offset in offsets:
            inf.seek(offset)
            lines.append(inf.readline())
    return lines

dataset = 'big_data.txt'
nsamp = 5
linepos = scan_linepos(dataset) # the scan only need be done once

lines = sample_lines(dataset, linepos, nsamp)
print 'selecting %d lines from a file of %d' % (nsamp, len(linepos))
print ''.join(lines)

我在磁盘上包含 1.7GB 的 300 万行模拟数据文件上对其进行了测试。在我不太热的桌面上,

scan_linepos
占据了运行时间大约 20 秒。

只是为了检查

sample_lines
的性能,我照此使用了
timeit
模块

import timeit
t = timeit.Timer('sample_lines(dataset, linepos, nsamp)', 
        'from __main__ import sample_lines, dataset, linepos, nsamp')
trials = 10 ** 4
elapsed = t.timeit(number=trials)
print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000,
        elapsed, (elapsed/trials) * (10 ** 6))

对于

nsamp
的各种值;当
nsamp
为 100 时,单个
sample_lines
在 460μs 内完成,并以每次调用 47ms 的速度线性扩展至 10k 个样本。

自然的下一个问题是随机根本不是随机的?,答案是“亚加密,但对于生物信息学来说肯定没问题”。


1
投票

使用了来自 What is the most “pythonic” way to iterate over a list in chunks?:

的 chunker 函数
from itertools import izip_longest

def grouper(iterable, n, fillvalue=None):
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return izip_longest(*args, fillvalue=fillvalue)


with open(filename) as f:
    for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk
        """process lines like 
        lines[0], lines[1] , ... , lines[chunk_size-1]"""

0
投票

假设“batch”意味着要一次处理所有 16 个记录而不是单独处理,一次读取文件中的一条记录并更新计数器;当计数器达到 16 时,处理该组。

interim_list = []
infile = open("my_very_large_text_file", "r")
ctr = 0
for rec in infile:
    interim_list.append(rec)
    ctr += 1
    if ctr > 15:
        process_list(interim_list)
        interim_list = []
        ctr = 0

the final group

process_list(interim_list)


0
投票

另一种解决方案可能是创建一个迭代器来生成 n 个元素的列表:

def n_elements(n, it):
    try:
        while True:
            yield [next(it) for j in range(0, n)]
    except StopIteration:
        return

with open(filename, 'rt') as f:
    for n_lines in n_elements(n, f):
        do_stuff(n_lines)


0
投票

使用团块:

from dollop.file import serve

file_name = 'my_very_large_text_file'

for lines in serve(file_name, N, mode='lines'):
    process_lines(lines)
© www.soinside.com 2019 - 2024. All rights reserved.