处理块中的大文件:与readline的不一致搜索

问题描述 投票:6回答:3

我试图用Python读取和处理大块文件。我正在关注this blog,它提出了一种非常快速的方法来读取和处理分布在多个进程中的大块数据。我只是稍微更新了现有代码,即使用stat(fin).st_size而不是os.path.getsize。在示例中,我还没有实现多处理,因为问题也在单个进程中表现出来。这使得调试更容易。

我对这段代码的问题在于它返回了破坏的句子。这是有道理的:指针不考虑行结尾,只返回一些给定的字节大小。在实践中,人们可以假设您可以通过省去所提取的批次行中的最后一项来解决这个问题,因为这很可能是虚线。不幸的是,这也无法可靠地工作。

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            f.readline()
            chunk_end = f.tell()
            yield chunk_start, chunk_end - chunk_start

            if chunk_end > file_end:
                break


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        batch = f.read(chunk_size).splitlines()

    # changing this to batch[:-1] will result in 26 lines total
    return batch


if __name__ == '__main__':
    fin = r'data/tiny.txt'
    lines_n = 0
    for start, size in chunkify(fin):
        lines = process_batch(fin, start, size)
        # Uncomment to see broken lines
        # for line in lines:
        #    print(line)
        # print('\n')
        lines_n += len(lines)

    print(lines_n)
    # 29

上面的代码将打印29作为已处理行的总数。当你没有返回批次的最后一项时,天真地假设这是一条折线,你会得到26。实际行数为27.测试数据可在下面找到。

She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1

如果你打印出创建的行,你会发现确实会出现断句。我发现这很奇怪。不应该f.readline()确保文件被读取,直到下一行?在下面的输出中,空行分隔两个批次。这意味着您无法检查批处理中下一行的行,如果它是子字符串则将其删除 - 破坏的句子属于另一个批处理而不是完整句子。

...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r


In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...

有没有办法摆脱这些破碎的句子,而不是删除太多?

您可以下载更大的测试文件(100,000行)here


经过大量的挖掘,似乎实际上一些无法访问的缓冲区导致了搜索的不一致行为,正如herehere所讨论的那样。我尝试了建议的解决方案,使用iter(f.readline, '')seek,但仍然给我不一致的结果。我更新了我的代码以在每批1500行之后返回文件指针,但实际上批次返回将重叠。

from os import stat
from functools import partial


def chunkify(pfin, max_lines=1500):
    file_end = stat(pfin).st_size
    with open(pfin, 'r', encoding='utf-8') as f:
        chunk_end = f.tell()

        for idx, l in enumerate(iter(f.readline, '')):
            if idx % max_lines == 0:
                chunk_start = chunk_end
                chunk_end = f.tell()
                # yield start position, size, and is_last
                yield chunk_start, chunk_end - chunk_start

    chunk_start = chunk_end
    yield chunk_start, file_end


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size).splitlines()

    batch = list(filter(None, chunk))

    return batch


if __name__ == '__main__':
    fin = r'data/100000-ep+gutenberg+news+wiki.txt'

    process_func = partial(process_batch, fin)
    lines_n = 0

    prev_last = ''
    for start, size in chunkify(fin):
        lines = process_func(start, size)

        if not lines:
            continue

        # print first and last ten sentences of batch
        for line in lines[:10]:
            print(line)
        print('...')
        for line in lines[-10:]:
            print(line)
        print('\n')

        lines_n += len(lines)

    print(lines_n)

下面是重叠批次的示例。最后一批的前两个半句从之前批次的最后一句复制。我不知道如何解释或解决这个问题。

...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els


For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...

除此之外,我还尝试从原始代码中删除readline,并跟踪剩余的不完整的块。然后将不完整的块传递给下一个块并添加到其前面。我现在遇到的问题是,因为文本是以字节块的形式读取的,所以可能会发生一个块结束而没有完全完成一个字符的字节。这个wille会导致解码错误。

from os import stat


def chunkify(pfin, buf_size=1024):
    file_end = stat(pfin).st_size
    with open(pfin, 'rb') as f:
        chunk_end = f.tell()

        while True:
            chunk_start = chunk_end
            f.seek(buf_size, 1)
            chunk_end = f.tell()
            is_last = chunk_end >= file_end
            # yield start position, size, and is_last
            yield chunk_start, chunk_end - chunk_start, is_last

            if is_last:
                break


def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size)

    # Add previous leftover to current chunk
    chunk = leftover + chunk
    batch = chunk.splitlines()
    batch = list(filter(None, batch))

    # If this chunk is not the last one,
    # pop the last item as that will be an incomplete sentence
    # We return this leftover to use in the next chunk
    if not is_last:
        leftover = batch.pop(-1)

    return batch, leftover


if __name__ == '__main__':
    fin = r'ep+gutenberg+news+wiki.txt'

    lines_n = 0
    left = ''
    for start, size, last in chunkify(fin):
        lines, left = process_batch(fin, start, size, last, left)

        if not lines:
            continue

        for line in lines:
            print(line)
        print('\n')

        numberlines = len(lines)
        lines_n += numberlines

    print(lines_n)

运行上面的代码,将不可避免地导致UnicodeDecodeError

Traceback (most recent call last):
  File "chunk_tester.py", line 46, in <module>
    lines, left = process_batch(fin, start, size, last, left)
  File "chunk_tester.py", line 24, in process_batch
    chunk = f.read(chunk_size)
  File "lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte
python python-3.x file-handling seek
3个回答
2
投票

你真是太近了!对最终代码进行相对简单的更改(将数据读入bytes而不是str)会使所有(几乎)工作。

主要问题是因为从二进制文件读取计数字节,但是从文本文件读取计数文本,并且您在字节中进行了第一次计数,在字符中进行了第二次计数,从而导致您对已经读取的数据的错误进行了假设。它与内部隐藏缓冲区无关。

其他变化:

  • 代码需要在b'\n'上拆分而不是使用bytes.splitlines(),并且只在相关检测代码之后删除空行。
  • 除非文件的大小发生变化(在这种情况下你的现有代码会中断),chunkify可以被一个更简单,更快的循环所取代,它在功能上完全相同而不必保持文件打开。

这给出了最终的代码:

from os import stat

def chunkify(pfin, buf_size=1024**2):
    file_end = stat(pfin).st_size

    i = -buf_size
    for i in range(0, file_end - buf_size, buf_size):
        yield i, buf_size, False

    leftover = file_end % buf_size
    if leftover == 0:  # if the last section is buf_size in size
        leftover = buf_size
    yield i + buf_size, leftover, True

def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
    with open(pfin, 'rb') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size)

    # Add previous leftover to current chunk
    chunk = leftover + chunk
    batch = chunk.split(b'\n')

    # If this chunk is not the last one,
    # pop the last item as that will be an incomplete sentence
    # We return this leftover to use in the next chunk
    if not is_last:
        leftover = batch.pop(-1)

    return [s.decode('utf-8') for s in filter(None, batch)], leftover


if __name__ == '__main__':
    fin = r'ep+gutenberg+news+wiki.txt'

    lines_n = 0
    left = b''
    for start, size, last in chunkify(fin):
        lines, left = process_batch(fin, start, size, last, left)

        if not lines:
            continue

        for line in lines:
            print(line)
        print('\n')

        numberlines = len(lines)
        lines_n += numberlines

    print(lines_n)

2
投票

你这里有一个有趣的问题。你有n进程给每个要处理的数据块的位置,但你不能提供块的确切位置,因为你正在处理行,你的位置是以字节为单位。即使您按行分割文件以获取块的精确位置,您也会遇到一些问题。

这是一个不太理想的解决方案(我假设你不想按顺序处理行:这看起来很明显):

  • 像你第一次尝试一样把文件切成块;
  • 对于每个块,找到第一个和最后一个换行符。块格式为:B\nM\nA,其中B(之前)和A(之后)不包含任何换行符,但M可能包含换行符;
  • 处理M中的行并将B\nA放在当前块索引的列表中;
  • 最后,处理所有B\nA元素。

这是次优的,因为一旦你处理了每个M,你仍然必须处理所有的B\nA,并且最后的工作必须等待其他进程完成。

这是代码:

def chunkify(file_end, buf_size=1024):
    """Yield chunks of `buf_size` bytes"""
    for chunk_start in range(0, file_end, buf_size):
        yield chunk_start, min(buf_size, file_end - chunk_start)

def process_batch(remainders, i, f, chunk_start, chunk_size):
    """Process a chunk"""
    f.seek(chunk_start)
    chunk = f.read(chunk_size)
    chunk, remainders[i] = normalize(chunk)
    # process chunk here if chunk is not None
    return chunk

def normalize(chunk):
    """Return `M, B\\nA`
    The chunk format is `B\\nM\\nA` where `B` (before) and `A` (after) do not contains any line feed,
    but `M` may contain line feeds"""
    i = chunk.find(b"\n")
    j = chunk.rfind(b"\n")
    if i == -1 or i == j:
        return None, chunk
    else:
        return chunk[i+1:j], chunk[:i]+chunk[j:]

请注意,如果块没有中间(M部分),那么我们将None作为块返回,并将所有内容发送到remainders

一些测试:

text = """She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1"""

import io, os

def get_line_count(chunk):
    return 0 if chunk is None else len(chunk.split(b"\n"))

def process(f, buf_size):
    f.seek(0, os.SEEK_END)
    file_end = f.tell()
    remainders = [b""]*(file_end//buf_size + 1)
    L = 0
    for i, (start, n) in enumerate(chunkify(file_end, buf_size)):
        chunk = process_batch(remainders, i, f, start, n)
        L += get_line_count(chunk)

    print("first pass: lines processed", L)
    print("remainders", remainders)
    last_chunk = b"".join(remainders)
    print("size of last chunk {} bytes, {} lines".format(len(last_chunk), get_line_count(last_chunk)))
    L += get_line_count(last_chunk)
    print("second pass: lines processed", L)

process(io.BytesIO(bytes(text, "utf-8")), 256)
process(io.BytesIO(bytes(text, "utf-8")), 512)

with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
    process(f, 4096)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
    process(f, 16384)

输出:

first pass: lines processed 18
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nWhat sort', b" of things do YOU remember best?'\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br", b'east cancer.\nBut, Frum explai', b'ns: "Glenn Beck takes it into his head that this guy is bad news."\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b'ted 2000 presidential election.\nThe auction hig', b"hlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nAlso there is Meghn', b'a River in the west of Brahmanbaria.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nS', b'ep 19: Eibar (h) WON 6-1']
size of last chunk 880 bytes, 9 lines
second pass: lines processed 27

first pass: lines processed 21
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br', b'east cancer.\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b"ted 2000 presidential election.\nThe auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nSep 19: Eibar (h) WON 6-1']
size of last chunk 698 bytes, 6 lines
second pass: lines processed 27

first pass: lines processed 96963
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nOne of the Ffarquhar ', ...,  b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 517905 bytes, 3037 lines
second pass: lines processed 100000

first pass: lines processed 99240
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nSoon Carroll was in push-up position walking her hands tow', b'ard the mirror at one side of the room while her feet were dragged along by the casual dinnerware.\nThe track "Getaway" was inspired by and allud', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 130259 bytes, 760 lines
second pass: lines processed 100000

最后一个示例显示您可以并行处理100,000行中的99,240行,但在完成所有过程后,您必须处理最后760行(130kio)。

关于并发性的注意事项:每个子进程拥有remainders列表的固定单元格,因此不应存在内存损坏。将每个余数存储在自己的进程对象(真实子进程的包装器)中,并在进程完成后连接所有剩余部分可能更简洁。


1
投票

当文件在文本模式(第二个代码示例)中打开时,readsize参数视为“字符数”(而非字节),但seektell与文件中当前位置的“空缓冲区”相关,因此:

  • 您可以从read计算块大小(供len(l)使用)
  • 使用file_end = stat(pfin).st_size计算最后一个块的大小是不正确的(因为对于utf-8编码,非拉丁字母的字符数可能不等于使用的字节数)
  • f.tell()仍然无法用于计算块大小,但为chunk_start提供了正确的结果。我认为这与TextIOWrapper的缓冲有某种关系:tell给出了有关缓冲区+解码器状态的信息,而不是关于文本流中的实际位置。您可以查看参考实现(def _read_chunkdef tell),看看它是如此复杂,没有人应该相信从不同的tell / seek调用计算的增量("# Grab all the decoded text (we will rewind any extra bits later)."给出了“错误”位置的原因的另一个暗示)

寻找/告诉工作正确“寻找”,但不能用于计算tell-s之间的字符数(偶数字节数将不正确)。要获得正确的byte deltas,应使用二进制非缓冲模式(with open(path, 'rb', buffering=0) as f: ...),但在这种情况下,开发人员应确保所有读取都返回“完整字符”(在“utf-8”中,不同的字符具有不同的字节长度)

但简单地使用chunk_size + =len(l)解决了所有问题,因此您可以使用文本模式继续打开文件!您的代码的下一个修改版本似乎按预期工作:

from functools import partial


def chunkify(pfin, max_lines=1500):
    with open(pfin, 'r', encoding='utf-8') as f:
        chunk_start = f.tell()
        chunk_size = 0
        done = True

        for idx, l in enumerate(iter(f.readline, '')):
            chunk_size += len(l)
            done = False
            if idx != 0 and idx % max_lines == 0:
                yield chunk_start, chunk_size
                done = True
                chunk_start = f.tell()
                chunk_size = 0

        if not done:
            yield chunk_start, chunk_size


def process_batch(pfin, chunk_start, chunk_size):
    with open(pfin, 'r', encoding='utf-8') as f:
        f.seek(chunk_start)
        chunk = f.read(chunk_size).splitlines()

    batch = list(filter(None, chunk))

    return batch


if __name__ == '__main__':
    fin = r'data/100000-ep+gutenberg+news+wiki.txt'

    process_func = partial(process_batch, fin)
    lines_n = 0

    prev_last = ''
    for start, size in chunkify(fin):
        lines = process_func(start, size)

        if not lines:
            continue

        # print first and last ten sentences of batch
        for line in lines[:10]:
            print(line)
        print('...')
        for line in lines[-10:]:
            print(line)
        print('\n')

        lines_n += len(lines)

    print(lines_n)
© www.soinside.com 2019 - 2024. All rights reserved.