Dispatcher 在任何错误后都没有订阅者

问题描述 投票:0回答:1

我们有一个 spring-cloud-stream 反应函数来处理消息事件,flux 有一个 flatMap 来使用 WebClient 进行外部 http 调用。每次 WebClient 出错时,FluxMessageChannel 的 MessageHandler 都会取消订阅该频道;然后,我们无法再向其发送任何事件,而不会出现有关消息通道有 0 个订阅者的错误。我们怎样才能避免取消订阅我们的处理程序。

这是我的代码:

...

private final WebClient profileServiceClient;

private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
    final String profileId = msg.getPayload().getProfileId();
    return profileServiceClient.get()
            .uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
            .retrieve()
            .bodyToMono(ProfileSchema.class);
};

@Bean
public Function<Flux<Message<MessageParticipant>>, Flux<Message<ProfileSchema>>> gatherProfile(ProfileService profileService) {
    return messages -> {
        return messages.log(log.getName(), Level.FINEST)
                .flatMap(getProfile)
                .map(p -> MessageBuilder.withPayload(p).build());
    };
}

这里是错误信息

    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter.access$400(SqsInboundChannelAdapter.java:33) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
    at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsInboundChannelAdapter.java:162) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:458) ~[spring-messaging-5.3.20.jar:5.3.20]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:222) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$MessageGroupExecutor.run(SimpleMessageListenerContainer.java:426) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers**
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.12.jar:5.5.12]
    ... 16 common frames omitted

我们尝试用

onStatus
没有成功

private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
    final String profileId = msg.getPayload().getProfileId();
    return profileServiceClient.get()
            .uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
            .retrieve()
            .onStatus(HttpStatus::isError, clientResponse -> {
                        log.error("Error while getting profile {} with status code {}", profileId, clientResponse.statusCode());
                        throw new ServiceException("Unable to retrieve profile for profile id : " + profileId);
                    })

            .bodyToMono(ProfileSchema.class);
};
spring-webflux spring-cloud-stream reactive flatmap
1个回答
0
投票

这已经被多次提及,所以我们不得不记录下来。基本上对于反应函数有一个常见的误解,人们认为我们可以控制流。我们没有。 . .这就是响应式 API 的本质。用户功能作为用户提供我们连接到源/目标流的流(即 Flux)的机制。一旦连接(在启动期间),就好像 s-c-stream 甚至不存在一样,因为它无法控制流和任何潜在的错误。这在命令式函数的情况下是根本不同的,函数充当消息处理程序并由框架在每个消息上调用,从而允许我们控制它的调用和处理错误。 总结:

  • 反应函数 - 在初始化期间仅由框架调用一次以连接流。
  • 命令式函数 - 框架在每个事件(消息)上调用并完全由框架控制。

更多信息-https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_reactive_functions_support

基本上reactive api提供了一套非常丰富的操作来处理错误和重试,所以我们推荐使用那些。

© www.soinside.com 2019 - 2024. All rights reserved.