多个春云流应用一起运行

问题描述 投票:0回答:1

我指的是张贴的例子 此处. 我正试图一起运行多个spring cloud stream应用程序。这里,第一个应用程序的输出作为输入给其他应用程序。下面是我想做的事情。

@Bean
    public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
    {
        //do some processing here and return 
    }
// read output from above process and join it with an event stream
@Bean
    public BiConsumer<KStream<UUID, ProcessEvent>, KTable<UUID, Application>> listen()
    {

        return (eventStream,appTable )-> eventStream
                .join(appTable, (event, app) -> app).foreach((k, app) -> app.createQuote());

    }

application.yml如下所示

spring.cloud:
 function: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.key.default.type: com.xxx.datamapper.domain.FormUUID
      spring.json.value.default.type: com.xxx.datamapper.domain.FormData
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: APPLICATION_TOPIC
    listen-in-1.destination: PROCESS_TOPIC

以上配置会抛出

java.lang.IllegalStateException: Multiple functions found, but function definition property is not set.

如果我尝试使用以下配置

spring.cloud.stream.function.definition: processAndListen

然后,我的应用程序工作,但第二个流配置(定义在监听Bean)没有得到执行。

spring-kafka spring-cloud-stream
1个回答
3
投票

在你的属性中,你需要添加这个。

spring.cloud:
 function.definition: process;listen

这应该也能用 spring.cloud.stream.function.definition: process;listen.

什么是 processAndListen. 这种价值从何而来?

© www.soinside.com 2019 - 2024. All rights reserved.