我正在尝试使用Spring Integration 4.3和Spring Boot 1.6将项目升级到Spring Integration 5.1和Spring Boot 2.1。以前我有以下配置:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("myId")
.autoStartup(autoStartup)
.prefetchCount(10)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
在升级过程中,我试图遵循文档here,从而将配置更改为:
@Configuration
@EnableAutoConfiguration
@EnableIntegration
public class SpringConfig {
@Bean(name = "myFlowId")
public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,
@Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,
MyMessageGroupProcessor myMessageGroupProcessor) {
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("myId")
.autoStartup(autoStartup)
.configureContainer(c -> c.acknowledgeMode(MANUAL)
.prefetchCount(10)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq one().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
}
}
但是当我发布消息时,它们似乎并不是由集成流接收/处理的。即使我启用调试日志记录,我也没有得到任何错误日志(或任何日志),我不确定从哪里开始调试。我很肯定这些消息实际发布到RabbitMQ,所以这不是问题所在。我能错过什么?
我的问题实际上不是由于Spring Integration,而是与Spring AMQP的变化有关。以前“声明”可以像这样创建:
@Bean
List<Binding> myBinding() {
return List.of(<binding1>, <binding2>, ..)
}
但在Spring AMQP 2.1中,这应该改为:
@Bean
Declarables myBinding() {
return new Declarables(List.of(<binding1>, <binding2>, ..))
}
请参阅文档here。
顺便说一句,我的releaseExpression
也错了,它应该是size() eq one.payload.batchSize
。