Spring Cloud Stream:Dispatcher 在任何错误后都没有订阅者

问题描述 投票:0回答:0

我们有一个 spring-cloud-stream 反应函数来处理消息事件,flux 有一个 flatMap 来使用 WebClient 进行外部 http 调用。每次 WebClient 出错时,FluxMessageChannel 的 MessageHandler 都会取消订阅该频道;然后,我们无法再向其发送任何事件,而不会出现有关消息通道有 0 个订阅者的错误。我们怎样才能避免取消订阅我们的处理程序。

这是我的代码:

    ...

    private final WebClient profileServiceClient;

    private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
        final String profileId = msg.getPayload().getProfileId();
        return profileServiceClient.get()
                .uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
                .retrieve()
                .bodyToMono(ProfileSchema.class);
    };

    @Bean
    public Function<Flux<Message<MessageParticipant>>, Flux<Message<ProfileSchema>>> gatherProfile(ProfileService profileService) {
        return messages -> {
            return messages.log(log.getName(), Level.FINEST)
                    .flatMap(getProfile)
                    .map(p -> MessageBuilder.withPayload(p).build());
        };
    }

这里是错误信息

    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.20.jar:5.3.20]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter.access$400(SqsInboundChannelAdapter.java:33) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
    at de.idealo.spring.stream.binder.sqs.inbound.SqsInboundChannelAdapter$IntegrationQueueMessageHandler.handleMessageInternal(SqsInboundChannelAdapter.java:162) ~[spring-cloud-stream-binder-sqs-1.9.0.jar:na]
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:458) ~[spring-messaging-5.3.20.jar:5.3.20]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:222) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$MessageGroupExecutor.run(SimpleMessageListenerContainer.java:426) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310) ~[spring-cloud-aws-messaging-2.4.2.jar:2.4.2]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
**Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers**
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.12.jar:5.5.12]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.12.jar:5.5.12]
    ... 16 common frames omitted

我们尝试 onStatus 但没有成功

    private Function<Message<MessageParticipant>, Mono<ProfileSchema>> getProfile = msg -> {
        final String profileId = msg.getPayload().getProfileId();
        return profileServiceClient.get()
                .uri(uriBuilder -> uriBuilder.path("/profiles/{id}").build(profileId))
                .retrieve()
                .onStatus(HttpStatus::isError, clientResponse -> {
                            log.error("Error while getting profile {} with status code {}", profileId, clientResponse.statusCode());
                            throw new ServiceException("Unable to retrieve profile for profile id : " + profileId);
                        })

                .bodyToMono(ProfileSchema.class);
    };
spring-webflux spring-cloud-stream reactive flatmap
© www.soinside.com 2019 - 2024. All rights reserved.