Spring云流源扩展的命名目的地

问题描述 投票:0回答:1

我们正在扩展在kafka binder上运行的SCDF中的rabbit mq源连接器,并添加额外的功能,这些功能可以使用函数来扩展,以在消息传递到最终目的地之前添加额外的处理。

我的问题是,目前,当以spring boot运行时,输出被默认传送到topic output。有什么方法可以给一个命名的队列而不是默认的输出主题?

这是否是一个已知的限制,因为兔子源是基于boot 2.1.x的?如果有,是否有其他方法可以使用最新的Rabbit mq监听器的Supplier函数实现类似的功能。

简单的应用程序

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration.class)
public class RabbitSourceApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitSourceApp.class, args);
    }

    @Bean
    public Function<String, String> upper() {
        return value -> value.toUpperCase();
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.source</groupId>
    <artifactId>source.sample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath></relativePath>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
        <spring-cloud.schema.version>2.2.1.RELEASE</spring-cloud.schema.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>spring-cloud-starter-stream-source-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
            <exclusions>
                <exclusion>  <!-- declare the exclusion here -->
                    <groupId>io.pivotal.spring.cloud</groupId>
                    <artifactId>spring-cloud-services-starter-config-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>${spring-cloud.schema.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

应用程序yaml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:29092
      bindings:
        upper-out-0:
          destination: upperQ
      function:
        definition: upper
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
rabbit.queues: TestQ
spring-cloud-dataflow
1个回答
0
投票

这是我的工作,但不知道这是否是正确的解决方案。

      bindings:
        output:
          destination: upperQ
© www.soinside.com 2019 - 2024. All rights reserved.