我们正在扩展在kafka binder上运行的SCDF中的rabbit mq源连接器,并添加额外的功能,这些功能可以使用函数来扩展,以在消息传递到最终目的地之前添加额外的处理。
我的问题是,目前,当以spring boot运行时,输出被默认传送到topic output。有什么方法可以给一个命名的队列而不是默认的输出主题?
这是否是一个已知的限制,因为兔子源是基于boot 2.1.x的?如果有,是否有其他方法可以使用最新的Rabbit mq监听器的Supplier函数实现类似的功能。
简单的应用程序
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.rabbit.source.RabbitSourceConfiguration.class)
public class RabbitSourceApp {
public static void main(String[] args) {
SpringApplication.run(RabbitSourceApp.class, args);
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.source</groupId>
<artifactId>source.sample</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath></relativePath>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud.version>Hoxton.SR1</spring-cloud.version>
<spring-cloud.schema.version>2.2.1.RELEASE</spring-cloud.schema.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-rabbit</artifactId>
<version>2.1.3.RELEASE</version>
<exclusions>
<exclusion> <!-- declare the exclusion here -->
<groupId>io.pivotal.spring.cloud</groupId>
<artifactId>spring-cloud-services-starter-config-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud.schema.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
应用程序yaml
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:29092
bindings:
upper-out-0:
destination: upperQ
function:
definition: upper
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit.queues: TestQ
这是我的工作,但不知道这是否是正确的解决方案。
bindings:
output:
destination: upperQ