Spring 云流:反应式管道中的异常导致流崩溃,并显示错误调度程序没有订阅者

问题描述 投票:0回答:1

我有以下消费者功能。如果WebClient调用成功就可以了

 @Bean
 fun consumer() = Consumer<Flux<String>> {
   flux ->
     flux.concatMap { s -> 
       WebClient
         .create()
         .get()
         .uri(s)
         .retrieve()
         .bodyToMono(String::class.java)
     }.subscribe()
 }

但是如果 WebClient 调用得到错误响应。它会在日志中输出这样的错误

ERROR [reactor.core.publisher.Operators][error] - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.reactive.function.client.WebClientResponseException

然后它将停止处理所有后续消息。所有后续消息都会转移到 DLQ,并出现错误

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
,而不会进入消费者

这是收到消息时和 WebClient 收到错误响应后的完整日志

[Thread: pool-4-thread-9] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer][handleDelivery] - Storing delivery for consumerTag: 'amq.ctag-0us5Idm5UakUO1s78PjmYw' with deliveryTag: '4' in Consumer@507f7cd1: tags=[[amq.ctag-0us5Idm5UakUO1s78PjmYw]], channel=Cached Rabbit Channel: AMQChannel
[Thread: app] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer][handle] - Received message: xxx
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=0
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.backoff.ExponentialBackOffPolicy][backOff] - Sleeping for 1000
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=1
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=1
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.backoff.ExponentialBackOffPolicy][backOff] - Sleeping for 2000
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=2
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=2
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=3
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry failed last attempt: count=3
[Thread: app] DEBUG [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer][publish] - Sending ErrorMessage: failedMessage: xxx
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1718)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1637)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1625)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1616)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1561)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1540)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1540)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
    at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=xxx
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 29 more

我猜当 WebClient 收到错误响应时,它会抛出一个错误,导致

ErrorCallbackNotImplemented
,从而导致流崩溃

spring-cloud-stream spring-cloud-dataflow
1个回答
0
投票

这是这里已多次询问并解释过的预期行为。它还记录在案 - https://docs.spring.io/spring-cloud-stream/docs/4.0.4-SNAPSHOT/reference/html/spring-cloud-stream.html#_reactive_functions_support 简而言之。 。 。 Spring Cloud Stream 对反应流没有控制权或可见性。我们只需将两个通量连接在一起。

© www.soinside.com 2019 - 2024. All rights reserved.