这是Java系统与设备通信相关的服务层代码
@Service
public class ValveOpenSender implements MessageSender<ValveOpenReply, ValveOpen> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${mqtt.cmd-timeout}")
private Long cmdTimeout;
@Autowired
private MqttGateway mqttGateway;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceOperationService deviceOperationService;
private final Map<String, Sinks.One<ValveOpenReply>> replyProcessor = new ConcurrentHashMap<>();
@Override
public Mono<ValveOpenReply> send(ValveOpen message) {
return send(Mono.just(message));
}
@Override
public Mono<ValveOpenReply> send(Publisher<ValveOpen> message) {
return Mono.from(message)
.flatMap(msg -> {
Mono<ValveOpenReply> replyFlux = replyProcessor
.computeIfAbsent(msg.getMessageId(), ignore -> {
Long operationId = sequenceGenerator.nextId();
//Generate an operation record with status incomplete
return Sinks.one();
})
.asMono()
.doOnNext(reply -> {
//After receiving the device response, the operation record status changes to Completed
})
.timeout(Duration.ofSeconds(cmdTimeout), Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT)))
.doFinally(signal -> replyProcessor.remove(msg.getMessageId()));
//ValveOpen
mqttGateway.sendToMqtt(msg.genTopic(), msg.encode().toString());
return replyFlux;
});
}
//After receiving the device response, the method will be called while executing the doOnNext method above
public void handleReply(ValveOpenReply reply) {
Sinks.One<ValveOpenReply> sinks = replyProcessor.get(reply.getMessageId());
if (sinks == null) {
logger.warn("No response message processor");
replyProcessor.remove(reply.getMessageId());
return;
}
sinks.emitValue(reply, (signalType, emitResult) -> {
throw new DeviceOperationException(ErrorCode.valueOf("Valve opening response processing failed!"));
});
}
}
@PostMapping("/valveOpen")
public Mono<?> valveOpen(@RequestBody ValveOpen valveOpen, HttpServletRequest request) {
valveOpen.setClientType(ClientType.getTypeName(request.getHeader("User-Agent")));
return valveOpenSender.send(valveOpen)
.map(reply -> {
Assert.isTrue(reply.getCode().shortValue() == 0, "valve open failed");
return Mono.just(ResultBuilder.buildActionSuccess());
});
}
大家好,这段代码是阀门开启接口,涉及到Java系统和设备之间的通信。通信协议是mqtt。整个代码的目标是向设备发送开阀命令,然后监听设备的响应消息,将操作记录的状态更改为完成。为了达到异步和非阻塞的效果,我们引入了Reactor。核心框架。但首先,据我所知,Mono对象需要整个流程在调用subscribe方法后才能开始工作。但是,Sinks的使用。一、上面代码中构造Mono对象并没有显式调用subscribe方法,但是整个代码可以正常运行。我对此很困惑,希望有人能解决我的疑惑。谢谢。
顺便说一句,我看到了一些关于这个的词,比如“Mono.from 是一种工厂方法,可以将任何类型的 Publisher 适配到 Mono。它只会采用第一个元素并忽略后续信号(完成,错误)如果有的话。这种转换不需要明确的订阅,因为它是冷的和单播的:它在订阅时开始产生信号,并将其资源专用于那个消费者。但我不确定它是否正确
我猜有可能在初始化对象并执行某些操作之后,你实际上隐式订阅了,但我不确定这个