如何创建被动“中间用户”?如何结合所有Flux 零件和加工结果进一步链(转换为Mono )?

问题描述 投票:1回答:1

我正在使用spring webflux + mongodb-reactive将二进制文件(图像)保存到Mongo DB。不幸的是,spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE不支持GridFsTemplate功能的反应式编程。所以我决定创建一个订阅者,它将获取所有DataBuffer部分,将它们组合并转换为InputStream,因此GridFsTemplate::store将成为可能:

public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;

public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
    this.gridFsTemplate = gridFsTemplate;
    this.fileName = fileName;
    dataBuffers = new ArrayList<>();
}

public void hookOnNext(DataBuffer dataBuffer) {
    dataBuffers.add(dataBuffer);
    request(1);
}

public void hookOnComplete() {
    DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
    InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
    ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}

问题是我想返回objectId进行进一步处理,但hookOnComplete是无效的类型。更多..我想从这里获得ObjectId的Mono,所以我可以以反应的方式进一步处理它。在这种情况下,正如我所理解的那样,我不应该使用“真正的”订阅者,而应该将Flux<T>的结果和onComplete结合起来,返回Mono<R>。项目反应堆有这样的能力吗?我是反应式编程的新手,所以我可能会错过整个想法,所以请指导我如何实现这一目标。

在我以前的解决方案中,我使用block()结束反应链,所以我得到了ObjectId,然后我用新链发出了对象id。但这肯定不是一个好的解决方案。

java spring-webflux project-reactor reactor
1个回答
1
投票

不幸的是,整个方法都在寻找麻烦,不建议这样做。

实施Subscriber(甚至使用BaseSubscriber)几乎从来都不是解决方案。如果你使用的驱动程序库不支持反应流,那么你should probably wrap it as blocking code

这样做会在途中丢失许多功能(运行时行为,背压等),但如果您的订户实现不完美,至少您不会冒险炸毁整个应用程序。

在您的核心库/驱动程序支持反应流之前,您应该坚持使用Spring MVC,它支持异步概念和某些情况下的Flux Mono返回类型。

© www.soinside.com 2019 - 2024. All rights reserved.