如何从阻塞队列创建reactor Flux?

问题描述 投票:0回答:1

我正在尝试实现从BlockingQueue创建的反应器Flux,但不确定哪个运算符最适合我的用例?

我正在创建一个流式REST端点,其中响应是Flux,它需要继续从BlockingQueue发出消息作为对GET REST调用的响应。

我已经尝试过论坛和文档,只能找到从可迭代集合或被动数据源启动的Flux,但没有来自任何BlockingQueue的示例。

java spring-webflux project-reactor
1个回答
2
投票

你可以尝试Flux#generateQueue#peek。请记住,如果队列为空,peek将返回null,并且它不能在onNext中使用。

就像是:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

还有Flux#repeatWhen运算符,如果你想在它被认为是空的时候重新订阅队列,例如有:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
© www.soinside.com 2019 - 2024. All rights reserved.