我有两个微服务,我需要在它们之间实现可靠的通知。我考虑过使用 redis 流 - serviceA 将向 serviceB 发送带有标识符 X 的请求。 一旦 serviceB 完成了 serviceA 要求的工作,它将创建/添加一个新项目到流(该流特定于 X),以让它知道它已经完成。
ServiceA可以发送多个请求,每个请求可能有不同的标识符。所以它会阻止不同流中的新元素。
我的问题是如何删除不再需要的流,具体取决于它们的年龄。例如,我想删除一天前创建的流。这可能吗?
如果不是,我很想听听您关于如何在 Redis 中消除不需要的流的任何想法。
谢谢
没有直接的方法可以根据 TTL/年龄删除旧条目。您可以将
XTRIM/XDEL
与其他命令组合使用来修剪流。
让我们看看如何使用
XTRIM
XTRIM 流 MAXLEN ~ 尺寸
XTRIM 将流修剪为给定数量的项目,并根据需要驱逐较旧的项目(ID 较低的项目)。
您可以根据删除策略每天或定期生成流大小,并使用
XLEN
命令将其存储在某处
运行一个定期作业,将 XTRIM 称为
XTRIM x-stream MAXLEN ~ (NEW_SIZE - PREVIOUS_SIZE)
例如,昨天的流大小是 500,现在是 600,那么我们需要删除 500 个条目,这样我们就可以运行
XTRIM x-stream MAXLEN ~ 100
您可以使用不同的删除策略,例如每天、每周、每周两次等。
XDEL 流 ID [ID...]
从流中删除指定的条目,并返回删除的条目数,如果某些 ID 不存在,该数可能与传递给命令的 ID 数不同。
因此,您可以做的是,每当服务 B 消费该事件时,服务本身就可以删除流条目,因为服务 B 知道流 ID,但一旦您开始使用消费者组,这将不起作用。所以我会说使用 Redis set 或 Redis map 来跟踪确认流 ID 并运行定期清理作业来清理流。
例如
服务 A 向服务 B 发送 ID 为 1 的流项 服务 B 在使用映射中的项目后确认流项目 ack_stream = { ID1: true },您可以跟踪其他数据,例如以消费群体为例。
扫描作业会定期运行,例如每天凌晨 1 点,读取 ack_stream 的所有元素并过滤掉所有需要删除的项目。现在您可以使用一组流 ID 批量调用
XDEL
命令。
我认为问题是关于删除未使用的流 - 而不是流中的消息,如前面的答案中所述。
您可以为流指定 TTL,就像为任何其他数据元素指定一样。
示例
创建流并添加一些消息:
XADD stream1 * field1 value-101 field2 value-201
XADD stream1 * field1 value-102 field2 value-202
XADD stream2 * field1 value-301 field2 value-401
XADD stream2 * field1 value-302 field2 value-402
120 秒后设置流的 TTL:
EXPIRE stream1 120
(integer) 1
TTL stream1
(integer) 116
向流中添加新消息不会影响过期时间:
XADD stream2 * field1 value-501 field2 value-501
TTL stream1
(integer) 98
保持流处于活动状态的一个选项是每次通过流发送消息时发出过期命令。
XADD stream2 * field1 value-301 field2 value-401
EXPIRE stream2 120
XADD stream2 * field1 value-302 field2 value-402
EXPIRE stream2 120
这将确保流在使用期间一直处于活动状态,并且在不使用时被删除。