如果这是一个愚蠢的问题,我深表歉意。
我有一个场景,其中有3个来自上游服务的主题(未加密)。不幸的是,我无法更改3个主题的行为。
上游服务在一天结束时批量发布所有消息,我需要获得交易的累积视图,因为交易的顺序对下游服务很重要。
我知道我无法在主题的不同分区中对消息进行重新排序,所以我确定了是否可以对其进行累加,然后我的服务可以采用累加的结果并对它们进行重新排序,然后再进行处理。
但是,我注意到一种奇怪的行为,希望有人能澄清我所缺少的内容。
当我使用1到500个帐户进行操作时,我看到在输出主题中累积并显示了500条消息。
但是,当我尝试使用10,000个帐户执行相同的操作时,看到的输出比应有的多。 (关于输出主题的13,000条消息)。
KStream<String, TransactionAccumulator> transactions =
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(k, v) -> v.getAccountId(),
with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
(aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
Materialized.with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.toStream((key, value) -> key.key());
如前所述,上游服务在一天结束时(而不是实时)批量发布所有事件。
将不胜感激,因为对于较小的体积,它似乎可以工作。
KStream.aggregate()
:Kafka Stream使用record cache来控制从aggregate
的物化视图(或KTable)向状态存储和下游处理器发出汇总更新的速率。例如:
with messages:
("word1", 4)
("word1", 2)
("word2", 3)
("word1", 1)
and your word count topolgy:
KStream.groupByKey()
.aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer())
.toStream();
you may received downstream messages like this:
("word1", 6)
("word2", 3)
("word1", 7)
如果您的接收器是幂等的,您可以使用键覆盖TransactionAccumulator
,也可以按[此处]所述使用KTable.suppress()
以只发出聚合窗口2的最后一条消息。