使用Function接口创建的Spring云流应用程序无法在Spring云数据流中进行数据通信。

问题描述 投票:0回答:1

我使用Spring云函数方法转移Flux,创建了3个简单的Spring云流应用(SourceProcessorSink)。

Source-app。

@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @PollableBean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> {
            String v1 = String.valueOf("abc");
            String v2 = String.valueOf("pqr");
            String v3 = String.valueOf("xyz");
            return Flux.just(v1, v2, v3);
        };
    }
}

处理器-应用。

@SpringBootApplication
public class ProcessorApplication {

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase()).log();
    }

    public static void main(String[] args) {
        SpringApplication.run(ProcessorApplication.class, args);
    }
}

Sink-app。

@SpringBootApplication
public class SinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

    @Bean
    public Consumer<Flux<String>> log() {
        return flux -> {
            flux.subscribe(f -> System.out.println("Received data: " + f));
        };
    }
}

我所添加的依赖关系是。

SpringBoot version = 2.2.6.RELEASE

implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.cloud:spring-cloud-starter-stream-rabbit")
implementation("org.springframework.cloud:spring-cloud-starter-function-webflux:3.0.7.RELEASE")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")

我已经在Spring Cloud Data Flow中注册了这些应用,并部署在一个流中。

我能够分别通过HTTP和RabbitMQ向这些应用程序传输数据并接收输出。但是,消息并没有在应用程序之间进行通信(Source->Processor->sink)。我是否缺少任何依赖性、注释或应用程序属性。

目前我的应用属性文件完全是空的。

spring-boot spring-cloud-stream spring-cloud-dataflow spring-cloud-function
1个回答
0
投票

你需要设置spring.cloud.stream.function.bindings.-in-0=input。参见 https:/cloud.iospring-cloud-staticspring-cloud-streamcurrentreferencehtmlspring-cloud-stream.html#_programming_model。. 我们希望在未来的Dataflow版本中实现自动化。

© www.soinside.com 2019 - 2024. All rights reserved.