带有RabbitMQ绑定器的Spring云流抛出java.lang.IllegalStateException:消息体太大

问题描述 投票:0回答:1

我使用 Spring Cloud Stream 4.0.4 和 Rabbitmq Binder 4.0.4 来处理 Spring Boot 3.1.3 应用程序中的消息。我也在使用rabbitMQ 3.10.0。问题是,当消息大小为 94MB 时,我的应用程序中存在一个错误,我会得到一个异常:

java.lang.IllegalStateException: Message body is too large (94664210), maximum configured size is 67108864. See ConnectionFactory#setMaxInboundMessageBodySize if you need to increase the limit.
    at com.rabbitmq.client.impl.CommandAssembler.consumeHeaderFrame(CommandAssembler.java:109) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.CommandAssembler.handleFrame(CommandAssembler.java:172) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.AMQCommand.handleFrame(AMQCommand.java:105) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:115) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:746) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:673) ~[amqp-client-5.17.1.jar!/:5.17.1]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

我的应用程序.属性:

spring.cloud.function.definition=result
spring.cloud.stream.bindings.result-in-0.group=coreGroup
spring.cloud.stream.bindings.result-in-0.destination=result

Bean(它正在调用简单的auditRepository.save(auditRecord)):

    @Bean
    public Consumer<ResultMessage> result() {
            return auditController::storeAuditRecord;
    }

我尝试在 application.properties 中使用 DLQ 设置 DLX,如下所示:

spring.cloud.stream.bindings.result-in-0.consumer.max-attempts=2
spring.cloud.stream.rabbit.bindings.result-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.result-in-0.consumer.dead-letter-exchange=resultDLX
spring.cloud.stream.rabbit.bindings.result-in-0.consumer.error-channel-enabled=true

我也尝试过设置最大尺寸:

spring.cloud.stream.rabbit.bindings.result-in-0.consumer.max-message-size=209715200

但它总会崩溃。消息不会推送到 DLQ 并且设置最大大小没有执行任何操作。

我并不强制处理这么大的消息,我可以以某种方式检查代码中的大小,但有时有重要的有效负载,可以高达 100MB(这种情况发生在千分之一的情况下)。

抛出异常后,消费者将无法工作,应用程序将停止为用户工作,因为他们在保存审核记录后无法获取结果负载。

如何将此消息推送到 DLQ 以便将来调查?

java spring-boot spring-cloud-stream spring-amqp spring-rabbit
1个回答
0
投票

这是由于最近对 amqp-client 进行了更改,以防止消息非常大时发生 OOM;在 Spring 收到消息之前检测到错误;因此 Spring 无法将其发送到 DLQ。

/**
 * Maximum body size of inbound (received) messages in bytes.
 *
 * <p>Default value is 67,108,864 (64 MiB).
 *
 * @param maxInboundMessageBodySize the maximum size of inbound messages
 */
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {

您可以通过向应用程序添加定制器 bean 来增加允许的大小。例如将其增加到 128MiB:

@Bean
ConnectionFactoryCustomizer cust() {
    return cf -> cf.setMaxInboundMessageBodySize(1024 * 1024 * 128);
}
© www.soinside.com 2019 - 2024. All rights reserved.