我想将Spring Webflux与服务器发送事件和Netty一起使用。我能够在客户端接收事件。但是,当客户端取消订阅时,服务器将始终发送另外两个客户端不会接收的事件。我相信这不取决于尝试发送事件之间的超时。即使更改超时,每次也增加2条消息。但是,如果我将两次尝试之间的超时时间增加到例如10秒,则只能再尝试一次。因此,可以在发送事件之前确定已取消的订阅吗?对我来说,保证至少一次交货很重要。
这是我的控制器:
@GetMapping
public Flux<ServerSentEvent> stream() {
return Flux.<ServerSentEvent>push(emitter -> {
long counter = 0;
while (!emitter.isCancelled()) {
emitter.next(ServerSentEvent.builder(++counter).build());
if (emitter.isCancelled()) {
System.err.println("Cancelled");
} else {
System.err.println(counter);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, OverflowStrategy.ERROR);
}
这是客户日志的结尾:
...
data:212
data:213
data:214
这是服务器日志的结尾:
...
213
214
215
216
Cancelled
您应该尝试不使用try/catch
,尤其是在反应性应用程序中不要使用Thread#sleep
。
@GetMapping
public Flux<ServerSentEvent> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(counter -> ServerSentEvent.builder(counter).build());
}
如果只想在一个时间间隔内生成事件,则可以使用interval
功能来完成。
我建议您阅读Reactive文档,它的确非常有趣。
https://projectreactor.io/docs/core/release/reference/#getting-started