下面是分支代码,它仅流到一个主题(第一个主题)。据我了解,它应该流到所有三个主题?
无论如何,我可以使用分支流化为三个主题?
@Bean
public Function<KStream<String, Usesr>, KStream<String, User>[]> testprocess() {
Predicate<String, User> stream1 = (k, v) -> v != null;
Predicate<String, User> stream2 = (k, v) -> v != null;
Predicate<String, User> stream3 = (k, v) -> v != null;
return input -> input.map(
(key, user) -> new KeyValue<String, User>(user.getId(), user))
.branch(stream1, stream2, stream3);
处理器的配置
testprocess-in-0:
destination: input.users
testprocess-out-0:
destination: users.test.out.0
testprocess-out-1:
destination: users.test.out.1
testprocess-out-2:
destination: users.test.out.2
通过查看您的谓词,似乎第一个谓词总是赢,而其他谓词则没有机会。在Kafka Streams分支中,评估为true的第一个谓词成功,并且相应的主题接收数据。您需要更改谓词中的逻辑,以便正确映射正确的主题。 Here是一个示例。