使用Reactor.core时,Mono对象要启动流是否需要显式订阅?

问题描述 投票:0回答:1

这是Java系统与设备通信相关的服务层代码

@Service
    public class ValveOpenSender implements MessageSender<ValveOpenReply, ValveOpen> {
        private final Logger logger = LoggerFactory.getLogger(getClass());

        @Value("${mqtt.cmd-timeout}")
        private Long cmdTimeout;

        @Autowired
        private MqttGateway mqttGateway;


        @Autowired
        private DeviceService deviceService;

        @Autowired
        private DeviceOperationService deviceOperationService;


        private final Map<String, Sinks.One<ValveOpenReply>> replyProcessor = new ConcurrentHashMap<>();


        @Override
        public Mono<ValveOpenReply> send(ValveOpen message) {
            return send(Mono.just(message));
        }

        @Override
        public Mono<ValveOpenReply> send(Publisher<ValveOpen> message) {
            return Mono.from(message)
                    .flatMap(msg -> {
                        Mono<ValveOpenReply> replyFlux = replyProcessor
                                .computeIfAbsent(msg.getMessageId(), ignore -> {
                                    Long operationId = sequenceGenerator.nextId();

                                    //Generate an operation record with status incomplete

                                    return Sinks.one();
                                })
                                .asMono()
                                .doOnNext(reply -> {

                                    //After receiving the device response, the operation record status changes to Completed
                                })
                                .timeout(Duration.ofSeconds(cmdTimeout), Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)))
                                .doFinally(signal -> replyProcessor.remove(msg.getMessageId()));

                        //ValveOpen 
                        mqttGateway.sendToMqtt(msg.genTopic(), msg.encode().toString());

                        return replyFlux;
                    });
        }


        //After receiving the device response, the method will be called while executing the doOnNext method above
        public void handleReply(ValveOpenReply reply) {
            Sinks.One<ValveOpenReply> sinks = replyProcessor.get(reply.getMessageId());
            if (sinks == null) {
                logger.warn("No response message processor");
                replyProcessor.remove(reply.getMessageId());
                return;
            }
            sinks.emitValue(reply, (signalType, emitResult) -> {
                throw new DeviceOperationException(ErrorCode.valueOf("Valve opening response processing failed!"));
            });
        }
    }




 @PostMapping("/valveOpen")
    public Mono<?> valveOpen(@RequestBody ValveOpen valveOpen, HttpServletRequest request) {

        valveOpen.setClientType(ClientType.getTypeName(request.getHeader("User-Agent")));

        return valveOpenSender.send(valveOpen)
                .map(reply -> {
                    Assert.isTrue(reply.getCode().shortValue() == 0, "valve open failed");
                    return Mono.just(ResultBuilder.buildActionSuccess());
                });
    }

大家好,这段代码是阀门开启接口,涉及到Java系统和设备之间的通信。通信协议是mqtt。整个代码的目标是向设备发送开阀命令,然后监听设备的响应消息,将操作记录的状态更改为完成。为了达到异步和非阻塞的效果,我们引入了Reactor。核心框架。但首先,据我所知,Mono对象需要整个流程在调用subscribe方法后才能开始工作。但是,Sinks的使用。一、上面代码中构造Mono对象并没有显式调用subscribe方法,但是整个代码可以正常运行。我对此很困惑,希望有人能解决我的疑惑。谢谢。

顺便说一句,我看到了一些关于这个的词,比如“Mono.from 是一种工厂方法,可以将任何类型的 Publisher 适配到 Mono。它只会采用第一个元素并忽略后续信号(完成,错误)如果有的话。这种转换不需要明确的订阅,因为它是冷的和单播的:它在订阅时开始产生信号,并将其资源专用于那个消费者。但我不确定它是否正确

java project-reactor
1个回答
0
投票

我猜有可能在初始化对象并执行某些操作之后,你实际上隐式订阅了,但我不确定这个

© www.soinside.com 2019 - 2024. All rights reserved.