使用Kafka流累积来自多个主题的事件

问题描述 投票:0回答:1

如果这是一个愚蠢的问题,我深表歉意。

我有一个场景,其中有3个来自上游服务的主题(未加密)。不幸的是,我无法更改3个主题的行为。

上游服务在一天结束时批量发布所有消息,我需要获得交易的累积视图,因为交易的顺序对下游服务很重要。

我知道我无法在主题的不同分区中对消息进行重新排序,所以我确定了是否可以对其进行累加,然后我的服务可以采用累加的结果并对它们进行重新排序,然后再进行处理。

但是,我注意到一种奇怪的行为,希望有人能澄清我所缺少的内容。

当我使用1到500个帐户进行操作时,我看到在输出主题中累积并显示了500条消息。

但是,当我尝试使用10,000个帐户执行相同的操作时,看到的输出比应有的多。 (关于输出主题的13,000条消息)。

KStream<String, TransactionAccumulator> transactions =
    disbursements
        .merge(repayments)
        .merge(fees)
        .groupBy(
            (k, v) -> v.getAccountId(),
            with(
                String(),
                serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(Transaction.class, mapper))))
        .windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
        .aggregate(
            TransactionAccumulator::new,
            (key, value, aggregate) -> aggregate.add(value),
            (aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
            Materialized.with(
                String(),
                serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
        .toStream((key, value) -> key.key());

如前所述,上游服务在一天结束时(而不是实时)批量发布所有事件。

将不胜感激,因为对于较小的体积,它似乎可以工作。

apache-kafka apache-kafka-streams spring-cloud-stream
1个回答
0
投票

KStream.aggregate():Kafka Stream使用record cache来控制从aggregate的物化视图(或KTable)向状态存储和下游处理器发出汇总更新的速率。例如:

    with messages: ("word1", 4) ("word1", 2) ("word2", 3) ("word1", 1) and your word count topolgy: KStream.groupByKey() .aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer()) .toStream(); you may received downstream messages like this: ("word1", 6) ("word2", 3) ("word1", 7)
  1. 如果您的接收器是幂等的,您可以使用键覆盖TransactionAccumulator,也可以按[此处]所述使用KTable.suppress()以只发出聚合窗口2的最后一条消息。
© www.soinside.com 2019 - 2024. All rights reserved.