我通过以下情况简化了我的问题:3个朋友共享一张会员卡。该卡有两个限制
我使用的是这样的kafka生产者,它在kafka集群中发送事件
{“名称”:“ friend_1”,“值”:10}
{“ name”:“ friend_3”,“ value”:20}
事件被发布到与kafka流相关的主题,该流按密钥分组并进行汇总以汇总花费的金额。这似乎可行,但是我遇到了“并发问题”
让我们对卡进行成像操作9次,因此仅需使用1次,总花费为190,这意味着还有10个单位可以消费。
因此,friend_2想要购买的东西的价格为11个单位(不应允许),friend_3想要购买的东西的价格为9个单位,应该允许。 Friend_3将第十次使用该卡修改状态。以后所有其他尝试都不应修改某些内容。
因此,卡用户知道他发送的事件是否修改了最大使用次数和总计数似乎是合理的。如何在kafka中进行?使用流聚合,我总是可以增加值,但是如何知道我的操作是否“修改了卡的状态”?
根据我对您问题的理解,有几种选择。
一种选择是在您filter()
聚合之后派生新流,以获取会修改卡“状态”的数据,例如过滤所有花费> 200
单位或使用> 10
的事件。然后,该流可以用于通知卡用户该卡已经用完,例如通过发送电子邮件。这种方法只能通过DSL来实现。
[需要更大的灵活性或更严格的控制时,另一种选择是使用Processor API(您可以将其与DSL集成在一起,以便大多数代码可以继续使用DSL),在此您自己实施聚合步骤(使用状态存储您附加到Transformer
或Processor
的文件)。在聚合期间,您可以实现检查传入事件是否有效的逻辑(在您的示例中:9个单位的friend_3有效,11个单位的friend_2无效)。如果有效,则聚合将增加卡的计数器(单位和用途),仅此而已。如果无效,则该事件将被丢弃并且不会修改计数器,Transformer
/ Processor
可以向另一个流发送新事件,该事件告诉卡用户某件事不起作用。您可以类似地实现该功能,以通知用户卡片已被完全使用,卡片不再可用或该卡片的任何其他“状态更改”。
此外,根据您要执行的操作,请查看Kafka Streams的interactive queries功能。有时其他应用程序可能希望对某些事物的最新状态(例如卡的状态)进行快速的点查找(查询),这可以通过例如REST API。
希望这会有所帮助!