是否指定了Kafka Streams拓扑的处理顺序?

问题描述 投票:0回答:1

我想知道是否指定了流拓扑处理消息的顺序。

示例:

        // read input messages

        KStream<String, String> inputMessages = builder.stream("demo_input_topic_1");
        inputMessages = inputMessages.peek((k, v) -> System.out.println("TECHN. NEW MESSAGE: key: " + k + ", value: " + v));

        // check if message was already processed

        KTable<String, Long> alreadyProcessedMessages = inputMessages.groupByKey().count();
        KStream<String, String> newMessages =
                inputMessages.leftJoin(alreadyProcessedMessages, (streamValue, tableValue) -> getMessageValueOrNullIfKnownMessage(streamValue, tableValue));
        KStream<String, String> filteredNewMessages =
                newMessages.filter((key, val) -> val != null).peek((k, v) -> System.out.println("FUNC. NEW MESSAGE: key: " + k + ", value: " + v));

        // process the message

        filteredNewMessages.map((key, value) -> KeyValue.pair(key, "processed message: " + value))
                .peek((k, v) -> System.out.println("PROCESSED MESSAGE: key: " + k + ", value: " + v)).to("demo_output_topic_1");

使用getMessageValueOrNullIfKnownMessage(...)

    private static String getMessageValueOrNullIfKnownMessage(String newMessageValue, Long messageCounter) {
        if (messageCounter > 1) {
            return null;
        }

        return newMessageValue;
    }

因此,在示例中只有一个输入主题和一个输出主题。

alreadyProcessedMessages中对输入主题进行计数(因此将创建本地状态)。同样,输入主题与计数表alreadyProcessedMessages联接在一起,联接的结果是流newMessages(如果消息计数> 1,则此流中的消息值为null。邮件的原始值)。

然后,过滤newMessages的消息(过滤出null值,并将结果写入输出主题。

因此,此最小流的作用是:它将所有具有新密钥(以前未处理过的密钥)的消息从输入主题写入输出主题。

在测试中,流有效。但是我认为这并不能保证。它仅能工作,因为消息在加入之前先由计数节点处理。

但是该订单有保证吗?

据我在所有文档中看到的,不能保证此处理顺序。因此,如果收到新消息,也可能会发生这种情况:

  • 消息由“加入节点”处理。
  • 该消息由“计数节点”处理。

这当然会产生不同的结果(因此,在这种情况下,如果第二次出现具有相同密钥的消息,由于尚未计数,它仍将与原始值结合在一起。]

那么在某处指定了处理顺序吗?

我知道在新版本的Kafka中,将根据输入分区中消息的时间戳完成KStream-KTable连接。但这在这里无济于事,因为拓扑使用相同的输入分区(因为它具有相同的消息)。

谢谢

apache-kafka stream apache-kafka-streams
1个回答
0
投票

这只是部分答案,可以缩小悬而未决的问题:

In(Confluent's Stream Architecture overview)中指出,“深度优先处理策略”用于遍历拓扑。没有提及在多个路径上相同输入可以到达的节点上进行同步。(但是,在1的详细程度上,基于此可以将其排除在外。)

关于DFS遍历的分支顺序,我没有找到明确的陈述。但是,在此Confluent documenation on namings within the topology中,通过一些示例显示了“拓扑中的操作员顺序”。现在可以假定这一顺序。这似乎是由DSL运营商在源代码中给出的顺序,也是执行顺序。那将提供您所要求的保证。但是我无法通过其他任何来源证实这一假设。

留下两个问题,可以通过在PAPI实现中找到相关的源代码来回答。

  1. 真的是没有同步点的普通DFS遍历吗?
  2. DFS中的分支顺序是否真的是2中定义的运算符顺序?如果不是,那是什么呢?
© www.soinside.com 2019 - 2024. All rights reserved.