Kafka 流分支并在分支之前将其中一个分支合并回流中?

问题描述 投票:0回答:1

是否有可能具有执行以下操作的拓扑(简化示例):


                            / --> branch1 --> output sink
                           /
                      (success)
                         /
source ---> stream ---> /
              ^         \ 
              |          \
              |       (failure)
              |            \ 
              |             \ --> branch2 --> retry processor -->|
              |                                                  |
              \--<  merge back if successful after x attempts <--/

基本上,决定事件是否进入输出接收器的谓词是一个可能失败的外部调用,如果失败,我们将事件发送到默认分支(分支 2),重试处理器将重试几次,如果调用成功我们转发事件并将结果流合并回初始流,以便它有机会转到输出主题。

有没有办法让这个工作?我已经设法让它一直到重试处理器,它完成它的工作并在成功后转发事件。但我一直无法让活动回到最初的流。

我对来自重试处理器的结果 Kstream 与原始流应用合并操作。

示例代码:

KStream<String, String> stream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

var branches = stream.split(Named.as("type"))
        .branch((key, value) -> {
            try {
                return someOperationThatMightFail();
            } catch (Exception e) {
                return false;
            }
        }, Branched.withFunction(kstream -> kstream,"branch1"))
        .defaultBranch(Branched.withFunction(kstream -> kstream,"branch2"));

var successBranch = branches.get("branch1");
var failedBranch = branches.get("branch2");

successBranch.to("output-topic");
failedBranch.transform(retrySupplier, Named.as("retry")).merge(stream);

另一个选择是在重试处理器之后将它与成功的 branch1 合并,但我似乎也无法做到这一点。

apache-kafka-streams
1个回答
0
投票

似乎有效,我遇到的问题与转发的事件有关,没有正确的时间戳。

© www.soinside.com 2019 - 2024. All rights reserved.