如何迭代Flux中的对象并对其进行操作?

问题描述 投票:0回答:1

我正在使用项目反应器,我想执行以下操作:

    @Override
    public void run(ApplicationArguments args) {
        Flux.from(KafkaReceiver.create(receiverOptions)
                        .receive()
                        .map(this::getObject)
                        .flatMap(this::iterateElasticWrites)
                        .flatMap(this::writeTheWholeObjectToS3)
        ).subscribe();
    }

    // What I'd like to do - but a non reactive code
    private Publisher<MyObj> iterateElasticWrites(MyObj message) {
        for (MyDoc file: message.getDocs()) {
            writeElasticDoc(file.getText());
        }
        return Mono.just(message);
    }

我正在努力寻找与 Project Reactor 中的

iterateElasticWrites
相当的内容。我想对我的一个对象执行迭代 (
MyObj
),并将其每个文档列表的元素反应式写入 elasticsearch 中。

spring-webflux project-reactor reactor reactive-streams
1个回答
5
投票

在 Reactor 中,您始终需要使用不同的运算符构建反应式流程,并且所有反应式/异步代码应返回

Mono
Flux

看看你的例子,它可能看起来像

private Mono<MyObj> iterateElasticWrites(MyObj message) {
    return Flux.fromIterable(message.getDocs())
            .flatMap(doc -> writeElasticDoc(doc.getText()))
            .thenReturn(message);
}

其中

writeElasticDoc
可以定义为

private Mono<Void> writeElasticDoc(String text) {
    ...
}
© www.soinside.com 2019 - 2024. All rights reserved.