当需要在Rx操作链中执行一些非同步操作时:
private static Single<String> process(final String s) {
return Single.just(s)
.map(String::toUpperCase)
.doOnSuccess(str -> saveValue1(str) // asynchronous op 1)
.doOnSuccess(str -> saveValue2(str) // asynchronous op 2);
}
没有flatMap
或blockingGet
就可以编写Rx链的替代方案,它可以将可完成方法saveValue1
,saveValue2
等的执行建立到链中。
private static Single<String> process(final String s) {
return Single.just(s)
.map(String::toUpperCase)
.doOnSuccess(str -> saveValue1(str).blockingGet() // asynchronous op 1)
.doOnSuccess(str -> saveValue2(str).blockingGet() // asynchronous op 2);
}
或
private static Single<String> process(final String s) {
return Single.just(s)
.map(String::toUpperCase)
.flatMap(str -> saveValue1(str).toSingleDefault(str) // asynchronous op 1)
.flatMap(str -> saveValue2(str).toSingleDefault(str) // asynchronous op 2);
}
复制示例:
package com.eisgroup.genesis.policy.core.lifecycle.commands.quote;
import io.reactivex.Completable;
import io.reactivex.Single;
import org.springframework.util.Assert;
public class Test {
// some variables to check that asynchronous save works
static String s1;
static String s2;
public static void main(String[] args) {
process("test").blockingGet();
Assert.hasText(s1, "TEST");
Assert.hasText(s2, "TEST");
}
// some fake examples to show asynchronous save
private static Completable saveValue1(String s) {
return Completable.defer(() -> {
s1 = s;
return Completable.complete();
});
}
private static Completable saveValue2(String s) {
return Completable.defer(() -> {
s2 = s;
return Completable.complete();
});
}
}
尝试一下:
public static void main(String[] args) {
process("test").blockingAwait();
System.out.println(s1.equals("TEST"));
System.out.println(s2.equals("TEST"));
}
private static Completable process(final String s) {
return Single.just(s)
.map(String::toUpperCase)
.flatMapCompletable(string -> Completable.mergeArray(
saveValue1(string).subscribeOn(Schedulers.io()),
saveValue2(string).subscribeOn(Schedulers.io())));
}
[saveValue1
和saveValue2
将并行运行好吗?不知道这是否是您想要的,但是出于效率和速度的考虑,我还是鼓励这样做。