files = [file_1, file_2, file_3, file_4, file_5, file_6, file_7, file_8, file_9, file_10]

我需要确定这些文件中是否存在一个字符串。通常这个字符串是在某个范围内找到的。例如,从 file_1 到 file_4,或从 file_3 到 file_10。

目前,我正在循环访问此列表并依次打开每个文件并检查其中是否存在该字符串。但这很慢,因为要搜索数千个字符串,并且要搜索的文件超过 50 个。






import bisect
import platform
from functools import lru_cache
from pathlib import Path
from statistics import mean, median
from typing import Sequence, Generator, TypeVar, Any

import pytest  # only required for parametric tests

# NOTE: the `key` parameter has been added to `bisect` in Python 3.10, so
#       if it needs to work on Python < 3.10, then the `files` sequence itself
#       would have to be reversed, and indexes treated accordingly.
#       Because it is confusing to implement, I prefer not trying.
assert platform.python_version_tuple() >= ("3", "10", "0")

class File:
    def __init__(self, filepath: Path) -> None:
        self._path = filepath
        # memoization at the instance-level
        self.contains_string = lru_cache(maxsize=None)(self.contains_string)

    def contains_string(self, s: str) -> bool:
        return s in self._path.read_text()

    def __repr__(self) -> str:  # for convenience
        name = self._path.name
        return f"File({name=!r})"

    __str__ = __repr__

    def cache_info(self) -> tuple[int, int, int, int]:
        """hits, misses, maxsize, currsize"""
        return self.contains_string.cache_info()

    def __members(self) -> tuple[Any, ...]:  # cf https://stackoverflow.com/a/45170549/11384184
        return self._path,

    # required by https://docs.python.org/3/faq/programming.html#how-do-i-cache-method-calls

    def __eq__(self, other) -> bool:
        if isinstance(other, File):
            return self.__members == other.__members

    def __hash__(self) -> int:
        return hash(self.__members)

def create_bool_seq_for_test(start_index, end_index, length) -> Sequence[bool]:
    assert 0 <= start_index <= end_index < length
    before = [False] * start_index
    between = [True] * (end_index - start_index + 1)
    after = [False] * (length - end_index - 1)
    result = before + between + after
    assert len(result) == length
    return result

NUMBER_OF_PARAMETRIC_TESTS_CUBED = 5  # /!\ will result in ~cubed (^3) number of tests
# N=1   --> 1      test
# N=5   --> 34     tests
# N=10  --> 219    tests
# N=20  --> 1539   tests
# N=50  --> 22099  tests
# N=100 --> 171699 tests


class Test:
    def test_sample_case(self, tmp_path: Path) -> None:
        # Given
        expected_start_at_index = 46
        expected_end_at_index = 48
        length = 100
        files = self.create_files_for_test(
            create_bool_seq_for_test(expected_start_at_index, expected_end_at_index, length),
        s = STRING_PRESENT
        # When
        actual_start, actual_end = find_start_and_end_indexes(files, s)
        # Then
        assert (expected_start_at_index, expected_end_at_index) == (actual_start, actual_end)
        assert (expected_start_at_index, expected_end_at_index) == self.oracle(files, s)  # sanity check

            (start_index, end_index, length, create_bool_seq_for_test(start_index, end_index, length))
            for length in range(1, NUMBER_OF_PARAMETRIC_TESTS_CUBED+1)
            for start_index in range(length)
            for end_index in range(start_index, length)
    def test_parametric(self, expected_start_at_index: int,
                        expected_end_at_index: int, length: int, presences: Sequence[bool], tmp_path: Path) -> None:
        """Parametric test for all combinations of start/end/length."""
        files = self.create_files_for_test(presences, tmp_path)
        assert (expected_start_at_index, expected_end_at_index) == \
               find_start_and_end_indexes(files, STRING_PRESENT)

    def test_performance(self, tmp_path: Path) -> None:
        # Given
        expected_start_at_index = 21
        expected_end_at_index = 27
        length = 100
        files = self.create_files_for_test(
            create_bool_seq_for_test(expected_start_at_index, expected_end_at_index, length),
        s = STRING_PRESENT

        # When
        find_start_and_end_indexes(files, s)
        hits_sut, misses_sut, _, _ = zip(
            *tuple(file.cache_info for file in files)

        for file in files:

        self.oracle(files, s)
        hits_oracle, misses_oracle, _, _ = zip(
            *tuple(file.cache_info for file in files)

        # Then
        mean_hits_sut, median_hits_sut = \
            mean(hits_sut), median(hits_sut)
        mean_misses_sut, median_misses_sut = \
            mean(misses_sut), median(misses_sut)

        mean_hits_oracle, median_hits_oracle = \
            mean(hits_oracle), median(hits_oracle)
        mean_misses_oracle, median_misses_oracle = \
            mean(misses_oracle), median(misses_oracle)

        print(f"{mean_hits_sut=!r} {median_hits_sut=!r}")
        print(f"{mean_misses_sut=!r} {median_misses_sut=!r}")
        print(f"{mean_hits_oracle=!r} {median_hits_oracle=!r}")
        print(f"{mean_misses_oracle=!r} {median_misses_oracle=!r}")

        assert mean_hits_sut >= mean_hits_oracle  # we may re-use some previous results
        assert mean_misses_sut < mean_misses_oracle  # we do fewer reads (misses)

    def test_split_range_recursively(self) -> None:
        for size in range(1, 100+1):
            r = range(size)
            values = tuple(  # consume all
            assert sorted(values) == list(r)

    def oracle(files: Sequence[File], s: str) -> tuple[int, int]:
        # using a dumb way
        start_index = None
        end_index = None
        has_started = False
        for i, file in enumerate(files):
            is_present = file.contains_string(s)
            if is_present and not has_started:
                has_started = True
                start_index = i
            elif has_started and not is_present:
                end_index = i - 1
        if start_index is None:
            raise ValueError("start_index not found")
        elif end_index is None:
            raise ValueError("end_index not found")
            return start_index, end_index

    def create_files_for_test(presences: Sequence[bool], dirpath: Path) -> Sequence[File]:
        files = []
        for i, is_present in enumerate(presences):
            filepath = dirpath / f"File_{i!s}.txt"
            filepath.write_text(STRING_PRESENT if is_present else STRING_ABSENT)
        return files

def find_start_and_end_indexes(files: Sequence[File], s: str) -> tuple[int, int]:
    # step 1 : find any True value (somewhere in the middle of the run)
    some_present_index = next(i for i in iter_bisect_like(range(len(files)))
                              if files[i].contains_string(s))
    # step 2 : find the leftmost True value to the left
    start_index = find_the_leftmost_true_value(files, hi=some_present_index, s=s)
    # step 3 : find the rightmost True value to the right
    end_index = find_the_rightmost_true_value(files, lo=some_present_index, s=s)
    return start_index, end_index

def find_the_leftmost_true_value(files: Sequence[File], hi: int, s: str) -> int:
    assert False < True  # in Python, booleans are actually integers 0 and 1
    assert (0, 1) == (False, True)  # proof
    # because we are searching for a False before a True, it is pretty simple
    return bisect.bisect_left(files, True, hi=hi,
                              key=lambda file: file.contains_string(s))

def find_the_rightmost_true_value(files: Sequence[File], lo: int, s: str) -> int:
    assert False < True  # in Python, booleans are actually integers 0 and 1
    assert (0, 1) == (False, True)  # proof
    # because we are searching for a True before a False, the order is reversed
    # for that, we need to provide a key to reverse the order
    # and pretend we want to try to insert a False
    # we need to `-1` because it returns the index **one after** the rightmost
    return bisect.bisect_right(files, False, lo=lo,
                               key=lambda file: not file.contains_string(s)
                               ) - 1

T = TypeVar("T")

def iter_bisect_like(values: Sequence[T]) -> Generator[T, None, None]:
    Produces in a bisect-like manner the values from the sequence.
    Example: split_range_recursively(range(101)) yields
             50, 25, 75, 12, 38, 63, 88, 6, 19, ...
    all_ranges_left = [range(len(values))]
    while len(all_ranges_left) != 0:
        r = all_ranges_left.pop(0)
        assert r.start < r.stop  # sanity check
        # find the middle point
        middle_index = (r.start + r.stop) // 2
        left_remaining_range = range(r.start, middle_index)
        right_remaining_range = range(middle_index + 1, r.stop)
        if len(left_remaining_range) != 0:
        if len(right_remaining_range) != 0:
        yield values[middle_index]
    return  # end of the generator


  • 创建了一个
  • 一个函数,用于创建
    s 然后
    s 然后
  • 的测试序列
  • 创建与这些 True/False 序列对应的文件的测试方法
  • 样本测试
  • 参数测试,检查代码对于所有可能的组合在功能上是否有效
  • 实现顺序检查的“预言机”
  • 性能测试,对于某些值失败(例如,当字符串仅包含在前几个文件中时,oracle 非常快)
  • 以二等分方式迭代值的函数,用于核心算法


