我们有一个系统,它接收一堆“项目已更新/创建/...”事件,并负责在项目更改后对其进行昂贵的计算。目前,每次“项目已更新/创建/...”类型的事件到达时,该系统都会执行这种昂贵的计算。然而,对项目的更改通常是快速连续进行的,因此会在短时间内导致大量(重复的)昂贵的计算和系统负载。
我们希望将昂贵的计算延迟 30 分钟,以聚合快速连续的各种变化,并在该窗口结束时仅执行一次计算。
因此,当第一个“项目 X 已更新/创建/...”到达时,我们希望启动 30 分钟的计时器。该窗口期间与项目 X 相关的任何事件都不会延长启动新窗口的窗口持续时间。 30 分钟结束时执行计算。如果在缓存擦除之后事件到达,则会启动一个 30 分钟的新窗口
项目 Y 也可能同时存在一个类似的、不相关的窗口。我们总共有 500,000 个项目。
是否可以帮助我们提供一些关于如何有效实现这种“聚合”窗口的指示/建议?有没有一种高效的数据结构可以解决我们的问题?
我们考虑过将所有事件添加到数据库中,然后通过 cron 每 30 分钟执行一次分组查询,但这有一些缺点:
ConcurrentHashMap
为每个项目存储 ScheduledFuture
,并使用 ScheduledExecutorService
来延迟昂贵计算的执行。
当事件到达时,您检查该项目是否已经有
ScheduledFuture
。如果有的话,你什么也不做。如果没有,则创建一个新的 ScheduledFuture
。
import java.util.concurrent.*;
public class ExpensiveComputation {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
private final ConcurrentHashMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();
public void itemUpdated(String itemId) {
futures.computeIfAbsent(itemId, id -> executorService.schedule(() -> {
performComputation(id);
futures.remove(id);
}, 30, TimeUnit.MINUTES));
}
private void performComputation(String itemId) {
// Perform the expensive computation here
System.out.println("Performing computation for item " + itemId);
}
}
ExppressiveComputation 是一个管理每个项目的 ScheduledFutures 的类。更新项目时会调用 itemUpdated 方法。它检查该项目是否已经有 ScheduledFuture,如果没有,它会创建一个新的 ScheduledFuture,该新的 ScheduledFuture 将在 30 分钟后调用 PerformComputation。
performComputation 方法是放置昂贵计算代码的地方。执行计算后,它从 ConcurrentHashMap 中删除 ScheduledFuture。
代码假设事件在多个线程中处理。 ConcurrentHashMap 和 ScheduledExecutorService 是线程安全的,因此您不需要添加锁来防止竞争条件。
编辑:
如果计算成本很高并且需要大量时间,那么最好在执行计算之前从映射中删除 ScheduledFuture。这样,如果在执行计算时有更新到达,则会安排新的计算。
import java.util.concurrent.*;
public class ExpensiveComputation {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
private final ConcurrentHashMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();
public void itemUpdated(String itemId) {
futures.computeIfAbsent(itemId, id -> executorService.schedule(() -> {
futures.remove(id);
performComputation(id);
}, 30, TimeUnit.MINUTES));
}
private void performComputation(String itemId) {
// Perform the expensive computation here
System.out.println("Performing computation for item " + itemId);
}
}
我认为这取决于你的缓存系统。 一般来说,您可以为每个键设置 TTL 并禁用缓存失效。
如果您使用Redis,您可以为您的密钥设置EXPIRE。 https://redis.io/commands/expire/
我认为你应该使用咖啡
caffine 将缓存存储在内存中
那么你
1、设置expireAfterWrite
Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(30)).build()