我有以下消费者功能。如果WebClient调用成功就可以了
@Bean
fun consumer() = Consumer<Flux<String>> {
flux ->
flux.concatMap { s ->
WebClient
.create()
.get()
.uri(s)
.retrieve()
.bodyToMono(String::class.java)
}.subscribe()
}
但是如果 WebClient 调用得到错误响应。它会在日志中输出这样的错误
ERROR [reactor.core.publisher.Operators][error] - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.reactive.function.client.WebClientResponseException
然后它将停止处理所有后续消息。所有后续消息都会转移到 DLQ,并出现错误
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
,而不会进入消费者
这是收到消息时和 WebClient 收到错误响应后的完整日志
[Thread: pool-4-thread-9] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer][handleDelivery] - Storing delivery for consumerTag: 'amq.ctag-0us5Idm5UakUO1s78PjmYw' with deliveryTag: '4' in Consumer@507f7cd1: tags=[[amq.ctag-0us5Idm5UakUO1s78PjmYw]], channel=Cached Rabbit Channel: AMQChannel
[Thread: app] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer][handle] - Received message: xxx
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.integration.mapping.AbstractHeaderMapper$HeaderMatcher][matchHeader] - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=0
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.backoff.ExponentialBackOffPolicy][backOff] - Sleeping for 1000
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=1
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=1
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.backoff.ExponentialBackOffPolicy][backOff] - Sleeping for 2000
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=2
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry: count=2
[Thread: app] DEBUG [org.springframework.cloud.stream.messaging.DirectWithAttributesChannel][debug] - preSend on channel 'bean 'input'', message: xxx
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Checking for rethrow: count=3
[Thread: app] DEBUG [org.springframework.retry.support.RetryTemplate][doExecute] - Retry failed last attempt: count=3
[Thread: app] DEBUG [org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer][publish] - Sending ErrorMessage: failedMessage: xxx
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:373)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:344)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:324)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:297)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$400(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1718)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1637)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1625)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1616)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1561)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1540)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1540)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=xxx
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 29 more
我猜当 WebClient 收到错误响应时,它会抛出一个错误,导致
ErrorCallbackNotImplemented
,从而导致流崩溃
这是这里已多次询问并解释过的预期行为。它还记录在案 - https://docs.spring.io/spring-cloud-stream/docs/4.0.4-SNAPSHOT/reference/html/spring-cloud-stream.html#_reactive_functions_support 简而言之。 。 。 Spring Cloud Stream 对反应流没有控制权或可见性。我们只需将两个通量连接在一起。