Spring Cloud Stream - 覆盖错误处理程序

问题描述 投票:1回答:1

我有一个多绑定弹簧云流应用程序,我的实现与Retry With the RabbitMQ Binder文档中解释的完全类似

我打算重试配置的次数并最终放弃。

如果错误如下所示,它甚至没有进入监听器代码,我应用x-death计数逻辑。所以问题是如何放弃反序列化和消息转换错误的消息:

2018-09-06 16:41:11.889 ERROR [data-connector,671345aea4270626,48dd1100d9e332f9,false] 8866 --- [rdedStarGroup-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.ducation.connector.event.domain.AwardResource] for GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[671345aea4270626], spanSampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}], failedMessage=GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[48dd1100d9e332f9], spanSampled=[0], X-B3-TraceId=[671345aea4270626], X-B3-SpanId=[48dd1100d9e332f9], X-B3-ParentSpanId=[671345aea4270626], spanParentSpanId=[671345aea4270626], X-B3-Sampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:165)
    at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
    at java.lang.Thread.run(Thread.java:748)

2018-09-06 16:41:11.900  WARN [data-connector,,,] 8866 --- [rdedStarGroup-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.ducation.connector.event.domain.AwardResource] for GenericMessage [payload=byte[369], headers={amqp_deliveryTag=1, amqp_consumerQueue=outcome.awardedStar.dataConnectorAwardedStarGroup, amqp_redelivered=false, notificationType=AwardResource, spanTraceId=671345aea4270626, spanId=671345aea4270626, amqp_receivedRoutingKey=outcome.awardedStar.dataConnectorAwardedStarGroup, nativeHeaders={spanTraceId=[671345aea4270626], spanId=[671345aea4270626], spanSampled=[0]}, x-first-death-exchange=outcome.awardedStar, X-B3-SpanId=671345aea4270626, x-death=[{reason=expired, count=65, exchange=DLX, routing-keys=[outcome.awardedStar.dataConnectorAwardedStarGroup], time=Thu Sep 06 16:21:21 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup.dlq}, {reason=rejected, count=65, exchange=outcome.awardedStar, routing-keys=[#], time=Thu Sep 06 16:21:16 GST 2018, queue=outcome.awardedStar.dataConnectorAwardedStarGroup}], x-first-death-reason=rejected, X-B3-Sampled=0, x-first-death-queue=outcome.awardedStar.dataConnectorAwardedStarGroup, X-B3-TraceId=671345aea4270626, id=865fe31f-8cb4-fab1-ce1f-7673e741ac48, amqp_consumerTag=amq.ctag-Mz8gBRwIdx6czW6JCox2-w, spanSampled=0, contentType=application/plain, timestamp=1536237671843}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:165) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    ... 8 common frames omitted

注意:我知道错误本身及其原因。有意测试x-death逻辑。

如果我的期望不可能,那么捕获此类异常以阻止应用程序无限重试的最佳做法是什么?

spring-cloud-stream
1个回答
1
投票

在正常情况下,MessageConversionExceptions被视为致命(不可恢复)并且永远不会被重新排队,但如果已配置,它们将转到DLQ。

请参阅here以获取此类例外的列表。

但是,由于您正在将DLQ消息重新路由回原始队列,因此这会破坏这些错误的“致命”性质。

我认为唯一实际的解决方法是使用republishToDlq而不是将死信号路由到原始队列,将它们到达另一个队列并在该队列上有另一个侦听器(原始数据,没有转换)并过滤掉消息标头中的此异常,同时将其他人转发到原始队列。

但我会针对该框架提出一个问题。

https://jira.spring.io/browse/AMQP-833

编辑

这是一个Spring Cloud Stream应用程序,它使用两种DLQ方法来丢弃不可恢复的消息......

@SpringBootApplication
@EnableBinding(So52209397Application.Channels.class)
public class So52209397Application {

    public static void main(String[] args) {
        SpringApplication.run(So52209397Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend("foo.group", "{\"bar\":\"baz\"}", m -> {
                m.getMessageProperties().setContentType("application/json");
                return m;
            }); // good
            template.convertAndSend("foo.group", "junk", m -> {
                m.getMessageProperties().setContentType("application/json");
                return m;
            });
        };
    }

    boolean first = true;

    @StreamListener("input")
    public void listen(Foo foo, @Header(value = "x-death", required = false) List<Map<?, ?>> xDeath) {
        System.out.println(foo);
        if (xDeath == null) {
            throw new RuntimeException("fail first time (with retries)");
        }
        System.out.println(xDeath);
    }

    @StreamListener("inputDlq")
    public void listen(byte[] failed, @Header(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE) LongString stackTrace) {
        System.out.println("StreamErrorHander: " + new String(failed));
        if (stackTrace.toString().contains("MessageConversionException")) { // should add other fatal errors here too
            System.out.println("Discarding message");
        }
        else {
            throw new RuntimeException("Good exception; sending back to original queue");
        }
    }

    public interface Channels {

        @Input
        public MessageChannel input();

        @Input
        public MessageChannel inputDlq();

    }

    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

spring.cloud.stream.bindings.input.destination=foo
spring.cloud.stream.bindings.input.group=group

# dead letters go to the default DLX/rk - DLX/foo.group
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true


# Raw consumer for failures
spring.cloud.stream.bindings.inputDlq.destination=DLX
spring.cloud.stream.bindings.inputDlq.group=foo.group.dlq

# disable retry for dlq listener
spring.cloud.stream.bindings.inputDlq.consumer.max-attempts=1

# dead letters are sent to DLX/foo.group.dlq.dlq
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.ttl=5000
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.queue-name-group-only=true
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.binding-routing-key=foo.group
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.auto-bind-dlq=true

# foo.group.dlq.dlq routes expiries back to main queue via default exchange (could also be foo)
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.dlq-dead-letter-exchange=
spring.cloud.stream.rabbit.bindings.inputDlq.consumer.dlq-dead-letter-routing-key=foo.group

spring.cloud.stream.rabbit.bindings.inputDlq.consumer.dlq-ttl=1

底线是主要的消费者死信 - .dlq的所有内容,由“原始”听众消费,它丢弃致命错误和死信“...dlq.dlq的”好“失败。这个有一个短TTL并重新路由回主队列。

Foo [bar=baz]
Foo [bar=baz]
Foo [bar=baz]
...
StreamErrorHander: {"bar":"baz"}
...
StreamErrorHander: junk
Discarding message
Foo [bar=baz]
[{reason=expired, count=1, exchange=DLX, routing-keys=[foo.group.dlq]...
© www.soinside.com 2019 - 2024. All rights reserved.