假设我有一个公开两个方法的API,每个方法返回一个observable
import org.assertj.core.util.VisibleForTesting;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
class SomeApiClass {
private static final String[] doOnSubscribeThread = new String[1];
static Observable<Integer> immediatelyDoWork() {
return Observable.just(1, 2)
.doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
.flatMap(ignore -> doWork());
}
static Observable<Integer> periodicallyDoWork() {
// interval is using default computation scheduler
return Observable.interval(1, TimeUnit.SECONDS)
.doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
.flatMap(ignore -> doWork());
}
@VisibleForTesting
static String getSubscribedOnThread() {
return doOnSubscribeThread[0];
}
private static Observable<Integer> doWork() {
return Observable.create(emitter -> {
Random random = new Random();
emitter.onNext(random.nextInt());
emitter.onComplete();
});
}
大多数API只是让调用应用程序设置subscribeOn
线程(想象这些测试是我的应用程序):
import org.junit.Test;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import static com.google.common.truth.Truth.assertThat;
public class ExampleTest {
@Test
public void canSetSubscribeOnThread() {
Observable<Integer> coloObservable = SomeApiClass.immediatelyDoWork()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
TestObserver<Integer> testObserver = coloObservable.test();
testObserver.awaitCount(2); // wait for a few emissions
assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
}
@Test
public void canSetSubscribeOnThreadIfApiUsesInterval() {
Observable<Integer> coloObservable = SomeApiClass.periodicallyDoWork()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
TestObserver<Integer> testObserver = coloObservable.test();
testObserver.awaitCount(2); // wait for a few emissions
assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
}
}
在immediate
示例中的IIUC所有订阅副作用(包括just()
)都将在新线程上发生。 Karnok explains well here。
但在periodic
示例中,interval
将使用默认(计算)调度程序。大多数API在这种情况下做了什么?除了interval
本身之外,他们是否让调用者为所有订阅副作用设置了subscribeOn线程?在上面的periodic
测试中,除了interval
之外,我们仍然可以设置subscribeOn线程。或者他们也添加一个参数来设置这个subscribeOn:
/**
* Works like {@link #periodicallyDoWork()} but allows caller to set subscribeOnSchedueler
*/
static Observable<Integer> periodicallyDoWork(Scheduler subscribeOnScheduler) {
return Observable.interval(1, TimeUnit.SECONDS, subscribeOnScheduler)
.doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
.flatMap(ignore -> doWork());
}
然后允许调用者省略subscribeOn()方法:
@Test
public void canSetSubscribeOnThreadIfApiUsesInterval() {
Observable<Integer> coloObservable = SomeApiClass.periodicallyDoWork(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
TestObserver<Integer> testObserver = coloObservable.test();
testObserver.awaitCount(2); // wait for a few emissions
assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
}
这有点矫枉过正吗?只要调用者也调用subscribeOn(),让interval
使用默认计算调度程序有什么危险吗?
在我看来,创建观察者链的API必须提供注入调度程序的方法。没有这种能力,单元测试几乎无法管理。
我在为实时系统编写测试方面有相当多的经验。只需能够为被测单元提供一个或两个TestScheduler
,就可以在合理测试和不打扰之间做出区别。考虑一个debounce()
方法周期为1秒的子系统。如果不能使用TestScheduler
并使用advanceTimeBy()
来控制时钟,则编写几十个案例的单元测试是不可行的。这意味着单元测试可以在10毫秒内完成,如果使用常规调度程序则需要几分钟。