如何最多每 30 分钟触发一次昂贵的计算?

问题描述 投票:0回答:3

我们有一个系统,它接收一堆“项目已更新/创建/...”事件,并负责在项目更改后对其进行昂贵的计算。目前,每次“项目已更新/创建/...”类型的事件到达时,该系统都会执行这种昂贵的计算。然而,对项目的更改通常是快速连续进行的,因此会在短时间内导致大量(重复的)昂贵的计算和系统负载。

我们希望将昂贵的计算延迟 30 分钟,以聚合快速连续的各种变化,并在该窗口结束时仅执行一次计算。

因此,当第一个“项目 X 已更新/创建/...”到达时,我们希望启动 30 分钟的计时器。该窗口期间与项目 X 相关的任何事件都不会延长启动新窗口的窗口持续时间。 30 分钟结束时执行计算。如果在缓存擦除之后事件到达,则会启动一个 30 分钟的新窗口

项目 Y 也可能同时存在一个类似的、不相关的窗口。我们总共有 500,000 个项目。

是否可以帮助我们提供一些关于如何有效实现这种“聚合”窗口的指示/建议?有没有一种高效的数据结构可以解决我们的问题?

我们考虑过将所有事件添加到数据库中,然后通过 cron 每 30 分钟执行一次分组查询,但这有一些缺点:

  • 所有item重新计算都是同时执行的(cron间隔触发的时间),这对系统造成很大的负载。因此,这种固定的窗口间隔可能不是一个好的解决方案。因此,我们希望第一个事件触发窗口的启动,而不是固定的 cron。
  • 将所有事件存储在数据库中的开销看起来并不吸引人。
  • 将窗口大小从 30 分钟更改为 30 秒可能会导致对数据库进行大量创建和删除。
  • ...
java algorithm data-structures
3个回答
1
投票

您可以使用

ConcurrentHashMap
为每个项目存储
ScheduledFuture
,并使用
ScheduledExecutorService
来延迟昂贵计算的执行。

当事件到达时,您检查该项目是否已经有

ScheduledFuture
。如果有的话,你什么也不做。如果没有,则创建一个新的
ScheduledFuture

import java.util.concurrent.*;

public class ExpensiveComputation {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
    private final ConcurrentHashMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();

    public void itemUpdated(String itemId) {
        futures.computeIfAbsent(itemId, id -> executorService.schedule(() -> {
            performComputation(id);
            futures.remove(id);
        }, 30, TimeUnit.MINUTES));
    }

    private void performComputation(String itemId) {
        // Perform the expensive computation here
        System.out.println("Performing computation for item " + itemId);
    }
}

ExppressiveComputation 是一个管理每个项目的 ScheduledFutures 的类。更新项目时会调用 itemUpdated 方法。它检查该项目是否已经有 ScheduledFuture,如果没有,它会创建一个新的 ScheduledFuture,该新的 ScheduledFuture 将在 30 分钟后调用 PerformComputation。

performComputation 方法是放置昂贵计算代码的地方。执行计算后,它从 ConcurrentHashMap 中删除 ScheduledFuture。

代码假设事件在多个线程中处理。 ConcurrentHashMap 和 ScheduledExecutorService 是线程安全的,因此您不需要添加锁来防止竞争条件。

编辑:

如果计算成本很高并且需要大量时间,那么最好在执行计算之前从映射中删除 ScheduledFuture。这样,如果在执行计算时有更新到达,则会安排新的计算。

import java.util.concurrent.*;

public class ExpensiveComputation {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
    private final ConcurrentHashMap<String, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();

    public void itemUpdated(String itemId) {
        futures.computeIfAbsent(itemId, id -> executorService.schedule(() -> {
            futures.remove(id);
            performComputation(id);
        }, 30, TimeUnit.MINUTES));
    }

    private void performComputation(String itemId) {
        // Perform the expensive computation here
        System.out.println("Performing computation for item " + itemId);
    }
}

0
投票

我认为这取决于你的缓存系统。 一般来说,您可以为每个键设置 TTL 并禁用缓存失效。

如果您使用Redis,您可以为您的密钥设置EXPIRE。 https://redis.io/commands/expire/


-1
投票

我认为你应该使用咖啡

caffine 将缓存存储在内存中

那么你

1、设置expireAfterWrite

Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(30)).build()
© www.soinside.com 2019 - 2024. All rights reserved.