执行“小于 X 的最大值”查找的更有效算法

问题描述 投票:0回答:1

我有一个事件驱动的应用程序,需要执行以下操作,其中每当收到事件时都会调用

handle_event()
函数:

from datetime import datetime
from dataclasses import dataclass


@dataclass
class Event:
    event_id: int
    occurrence_time: datetime
    processing_time: datetime

    def __post_init__(self) -> None:
        if self.occurrence_time > self.processing_time:
            raise ValueError("Impossible event! An event can never be processed before it occurs.")


_all_events: list[Event]

def handle_event(curr_event: Event) -> None:
    past_events = (e in _all_events if e.occurrence_time < curr_event.occurrence_time)
    try:
        prev_event = max(
            prev_events, key=lambda e: (e.occurrence_time, e.processing_time),
        )
    except ValueError:  # "ValueError: max() arg is an empty sequence"
        prev_event = None

    _all_events.append(curr_event)
    do_something(prev_event, curr_event)

我可以做些什么来降低这个算法的时间复杂度吗?我愿意改变

all_events
的存储方式,使用某种缓存/记忆,将数据存储在某种单独的索引数据结构中,等等。


我的系统具有以下特点:

  • 事件到达我的系统的处理时间顺序,而不是发生时间。因此我知道
    all_events
    将始终按处理时间排序。
  • 大多数事件(假设大约 70 - 90%)按照发生时间正确排序到达,但其余事件到达较晚,这意味着它们在其他几个发生时间较晚的记录之后到达。这些是系统的基本特征,无法更改。
  • 同样,
    do_something()
    函数内部的业务逻辑本质上是一个递归算法,也不能改变以不同的方式工作。为了进行本练习,假设每次调用
    do_something()
    需要 1 毫秒。
  • 只有一个事件处理过程,因此我们可以忽略任何关于并发的问题。

这一切都运行得很好,除非

all_events
变大。在最常见的记录大约按发生时间顺序到达的情况下,这种查找效率非常低,并且当已经存储了很多事件时,这种查找的成本非常高。具体来说,如果大多数事件按发生时间顺序到达,我最终将扫描每个事件的大部分或全部
all_events
列表。这个

在实际规模的真实数据测试中,此过程变得异常缓慢,需要“几天”才能在中等大小的 VPS 上处理 CPython 3.11 中的约 100 万个事件。我相信我已经编写了一个“意外二次”算法,并且有大量记录,我处于抛物线的极其陡峭的区域。 我碰巧在这里使用Python,但这个问题比任何特定于Python的问题都更具概念性。

注意

:在我的实际应用程序中,all_events列表存储在数据库中,该数据库自动神奇地对数据进行分区和索引以支持此类任务(TimescaleDB)。但是,我“不是”寻找依赖于访问任何特定数据库的解决方案。我想从“算法和概念的角度”关注这个问题,而不是依赖于任何可能存在的现有数据库功能/索引/优化。我偶尔需要实现类似的逻辑,但

没有访问此类数据库的好处,并且我想避免编写效率极低的代码。
您可以通过维护按 occurrence_time 排序的(第二个?)列表来改进一点。为此,您可以受益于bisect

algorithm data-structures time-complexity
1个回答
0
投票
例如:

_all_events: list[Event]
_sorted_events: list[Event]

def handle_event(curr_event: Event) -> None:
    _all_events.append(curr_event)
    i = bisect_left(_sorted_events, curr_event.occurrence_time, 
                                    key=attrgetter('occurrence_time'))
    _sorted_events.insert(i, curr_event)
    prev_event = _sorted_events[i-1] if i else None 
    do_something(prev_event, curr_event)
为了做得更好,您可以使用实现某种排序容器的库,例如基于 AVL 树、红黑树、跳过列表、B 树等。

例如,使用

sortedcontainers

_all_events: list[Event] _sorted_events = SortedList(key=attrgetter('occurrence_time')) def handle_event(curr_event: Event) -> None: _all_events.append(curr_event) i = _sorted_events.bisect_left(curr_event) _sorted_events.add(curr_event) prev_event = _sorted_events[i-1] if i else None do_something(prev_event, curr_event)


© www.soinside.com 2019 - 2024. All rights reserved.