使用反应式Couchbase Java驱动程序的批处理

问题描述 投票:0回答:1

假设我有一个存储桶,需要从该存储桶中获取日期早于现在的文档。该文档如下所示:

{
id: "1",
date: "Some date",
otherObjectKEY: "key1"
}

对于每个文档,我需要使用其otherObjectKEY获取另一个文档,将另一个文档发送给kafka主题,然后删除原始文档。

使用响应式Java驱动程序3.0,我能够用类似的方法做到这一点:

public void batch(){
    streamOriginalObjects()
         .flatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
                       .flatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
         )
         .subscribe();
}

streamOriginalObjects():

public Flux<OriginalObject> streamOriginalObjects(){
        return client.query("select ... and date <= '"+ LocalDateTime.now().toString() +"'")
                .flux()
                .flatMap(result -> result.rowsAs(OriginalObject.class));
    }

[它的工作原理与预期的一样,但是我想知道是否有比逐个流处理每个元素更好的方法(特别是在性能方面)。

reactive-programming couchbase
1个回答
0
投票

进行N1QL查询,然后从中散发键值操作是一种有用且常见的模式。这应该使扇出并行发生:

    streamOriginalObjects()
        // Split into numberOfThreads 'rails'
        .parallel(numberOfThreads)

        // Run on an unlimited thread pool
        .runOn(Schedulers.elastic())

        .concatMap(originalObject -> fetchOtherObjectUsingItsKEY(originalObject)
            .concatMap(otherObject -> sendToKafkaAndDeleteOriginalObject(originalObject))
        )

        // Back out of parallel mode
        .sequential()
        .subscribe();
© www.soinside.com 2019 - 2024. All rights reserved.