我正在尝试实现从BlockingQueue创建的反应器Flux,但不确定哪个运算符最适合我的用例?
我正在创建一个流式REST端点,其中响应是Flux,它需要继续从BlockingQueue发出消息作为对GET REST调用的响应。
我已经尝试过论坛和文档,只能找到从可迭代集合或被动数据源启动的Flux,但没有来自任何BlockingQueue的示例。
你可以尝试Flux#generate和Queue#peek。请记住,如果队列为空,peek
将返回null
,并且它不能在onNext
中使用。
就像是:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
还有Flux#repeatWhen运算符,如果你想在它被认为是空的时候重新订阅队列,例如有:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))