如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?

问题描述 投票:0回答:1

使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示:

@Bean
public Function<KStream<String, Double>, KStream<String, Double>> sqrt() {
    return numbers -> numbers.mapValues(Math::sqrt);
}

@Bean
public Consumer<KStream<String, Double>> log() {
    return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value));
}

其中

sqrt()
输出数字的平方根,然后用
log()
记录。
application.yaml
因此看起来像这样:

spring:
  cloud:
    stream:
      function:
        bindings:
          sqrt-in-0: numbers
          sqrt-out-0: sqrt-numbers
          log-in-0: sqrt-numbers
      kafka:
        streams:
          bindings:
            sqrt:
              consumer:
                application-id: sqrtApplicationId
            log:
              consumer:
                application-id: logApplicationId

启动应用程序时,出现以下错误:

The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

当然,现在将

definition-overriding
设置为
true
并不是一个正确的解决方案,并且它会失败并显示
IllegalStateException

我该如何解决这个问题?

问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error

spring-boot apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

假设您有两个名为

numbers
 sqrt-numbers
的 Kafka 主题,则以下配置应该有效。

spring:
  cloud:
    stream:
      bindings:
        sqrt-in-0:
          destination: numbers
        sqrt-out-0: 
          destination: sqrt-numbers
        log-in-0: 
          destination: sqrt-numbers
      kafka:
        streams:
          bindings:
            sqrt-in-0:
              consumer:
                application-id: sqrtApplicationId
            log-in-0:
              consumer:
                application-id: logApplicationId

您可以使用

spring.cloud.stream.function.bindings..
覆盖默认绑定名称。例如,如果您想将绑定名称从
sqrt-in-0
更改为
input
,您可以像
spring.cloud.stream.function.bindings.sqrt-in0-0: input
那样进行操作。不过,您仍然需要在覆盖的绑定上设置
destination
(通过
spring.cloud.stream.bindings.input.destination
)。

您遇到的特定异常是因为您试图重用已创建的绑定名称 -

sqrt-numbers

© www.soinside.com 2019 - 2024. All rights reserved.