是否有可能具有执行以下操作的拓扑(简化示例):
/ --> branch1 --> output sink
/
(success)
/
source ---> stream ---> /
^ \
| \
| (failure)
| \
| \ --> branch2 --> retry processor -->|
| |
\--< merge back if successful after x attempts <--/
基本上,决定事件是否进入输出接收器的谓词是一个可能失败的外部调用,如果失败,我们将事件发送到默认分支(分支 2),重试处理器将重试几次,如果调用成功我们转发事件并将结果流合并回初始流,以便它有机会转到输出主题。
有没有办法让这个工作?我已经设法让它一直到重试处理器,它完成它的工作并在成功后转发事件。但我一直无法让活动回到最初的流。
我对来自重试处理器的结果 Kstream 与原始流应用合并操作。
示例代码:
KStream<String, String> stream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));
var branches = stream.split(Named.as("type"))
.branch((key, value) -> {
try {
return someOperationThatMightFail();
} catch (Exception e) {
return false;
}
}, Branched.withFunction(kstream -> kstream,"branch1"))
.defaultBranch(Branched.withFunction(kstream -> kstream,"branch2"));
var successBranch = branches.get("branch1");
var failedBranch = branches.get("branch2");
successBranch.to("output-topic");
failedBranch.transform(retrySupplier, Named.as("retry")).merge(stream);
另一个选择是在重试处理器之后将它与成功的 branch1 合并,但我似乎也无法做到这一点。
似乎有效,我遇到的问题与转发的事件有关,没有正确的时间戳。