我们正在尝试使用反应式编程(大约100万份文档)将数据从一个存储桶复制到另一个存储桶。我们在这段代码中获得了OOM。我不是rxjava专家,需要帮助以防止OOM。我认为读取比写入更快,并且由于缓冲区变满而导致OOM。代码如下:
CountDownLatch countDownLatch5 = new CountDownLatch(1);
Observable
.from(n1qlKeysForDocsGPC)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String key) {
return readPrimaryMainAsyncBucket
.get(key, 10, TimeUnit.SECONDS)
.onErrorResumeNext(readPrimaryMainAsyncBucket.get(key, 10, TimeUnit.SECONDS))
.retry(50)
.switchIfEmpty(Observable.empty())
.onErrorResumeNext(Observable.empty());
}
})
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(JsonDocument jsonDocument) {
return readPrimaryBackupAsyncBucket.upsert(jsonDocument, 10, TimeUnit.SECONDS).retry(50);
}
})
.last()
.doOnTerminate(new Action0() {
@Override
public void call() {
countDownLatch5.countDown();
}
})
.subscribe();
try {
countDownLatch5.await();
logger.info("DataRecoverySchedulers | Completed countDownLatch5");
} catch (InterruptedException e) {
e.printStackTrace();
}
在3.x之前版本的Couchbase Java SDK(在撰写本文时尚未完成)使用RxJava版本1。
flatmap
调用,就像你现在拥有它们一样,将操作发布到内部缓冲区以异步执行,返回一个Observable
来跟踪每一个。这意味着第一个flatmap
将以无限的方式消耗你的from
呼叫的输出。换句话说,它将比操作发生的速度更快地读取整个列表。我希望你看到的OOM错误来自于超越Couchbase内部缓冲区。
要更正此问题,您可以使用flatmap
的变体来限制未完成订阅的数量。您只需在flatmap
调用中添加第二个整数参数即可。因此,你有.flatmap(new Func1<~>..., 10)
一次限制自己10个优秀的操作。
Couchbase中的默认缓冲区大约有16000个未完成的操作,但这远远超过了大多数系统所需要的。
供参考,请参阅此相关Stack Overflow post关于限制文件上载的吞吐量。