我有一个事件驱动的应用程序,需要执行以下操作,其中每当收到事件时都会调用
handle_event()
函数:
from datetime import datetime
from dataclasses import dataclass
@dataclass
class Event:
event_id: int
occurrence_time: datetime
processing_time: datetime
def __post_init__(self) -> None:
if self.occurrence_time > self.processing_time:
raise ValueError("Impossible event! An event can never be processed before it occurs.")
_all_events: list[Event]
def handle_event(curr_event: Event) -> None:
past_events = (e in _all_events if e.occurrence_time < curr_event.occurrence_time)
try:
prev_event = max(
prev_events, key=lambda e: (e.occurrence_time, e.processing_time),
)
except ValueError: # "ValueError: max() arg is an empty sequence"
prev_event = None
_all_events.append(curr_event)
do_something(prev_event, curr_event)
我可以做些什么来降低这个算法的时间复杂度吗?我愿意改变
all_events
的存储方式,使用某种缓存/记忆,将数据存储在某种单独的索引数据结构中,等等。
我的系统具有以下特点:
all_events
将始终按处理时间排序。do_something()
函数内部的业务逻辑本质上是一个递归算法,也不能改变以不同的方式工作。为了进行本练习,假设每次调用 do_something()
需要 1 毫秒。这一切都运行得很好,除非
all_events
变大。在最常见的记录大约按发生时间顺序到达的情况下,这种查找效率非常低,并且当已经存储了很多事件时,这种查找的成本非常高。具体来说,如果大多数事件按发生时间顺序到达,我最终将扫描每个事件的大部分或全部 all_events
列表。这个
在实际规模的真实数据测试中,此过程变得异常缓慢,需要“几天”才能在中等大小的 VPS 上处理 CPython 3.11 中的约 100 万个事件。我相信我已经编写了一个“意外二次”算法,并且有大量记录,我处于抛物线的极其陡峭的区域。 我碰巧在这里使用Python,但这个问题比任何特定于Python的问题都更具概念性。
注意:在我的实际应用程序中,all_events
列表存储在数据库中,该数据库自动神奇地对数据进行分区和索引以支持此类任务(TimescaleDB)。但是,我“不是”寻找依赖于访问任何特定数据库的解决方案。我想从“算法和概念的角度”关注这个问题,而不是依赖于任何可能存在的现有数据库功能/索引/优化。我偶尔需要实现类似的逻辑,但
没有访问此类数据库的好处,并且我想避免编写效率极低的代码。您可以通过维护按
occurrence_time
排序的(第二个?)列表来改进一点。为此,您可以受益于bisect
。
_all_events: list[Event]
_sorted_events: list[Event]
def handle_event(curr_event: Event) -> None:
_all_events.append(curr_event)
i = bisect_left(_sorted_events, curr_event.occurrence_time,
key=attrgetter('occurrence_time'))
_sorted_events.insert(i, curr_event)
prev_event = _sorted_events[i-1] if i else None
do_something(prev_event, curr_event)
为了做得更好,您可以使用实现某种排序容器的库,例如基于 AVL 树、红黑树、跳过列表、B 树等。例如,使用
sortedcontainers
:
_all_events: list[Event]
_sorted_events = SortedList(key=attrgetter('occurrence_time'))
def handle_event(curr_event: Event) -> None:
_all_events.append(curr_event)
i = _sorted_events.bisect_left(curr_event)
_sorted_events.add(curr_event)
prev_event = _sorted_events[i-1] if i else None
do_something(prev_event, curr_event)