从API设置订阅线程的最佳实践

问题描述 投票:0回答:1

假设我有一个公开两个方法的API,每个方法返回一个observable

import org.assertj.core.util.VisibleForTesting;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.Scheduler;

class SomeApiClass {

    private static final String[] doOnSubscribeThread = new String[1];

    static Observable<Integer> immediatelyDoWork() {
        return Observable.just(1, 2)
                .doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
                .flatMap(ignore -> doWork());
    }

    static Observable<Integer> periodicallyDoWork() {
        // interval is using default computation scheduler
        return Observable.interval(1, TimeUnit.SECONDS)
                .doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
                .flatMap(ignore -> doWork());
    }

    @VisibleForTesting
    static String getSubscribedOnThread() {
        return doOnSubscribeThread[0];
    }

    private static Observable<Integer> doWork() {
        return Observable.create(emitter -> {
            Random random = new Random();
            emitter.onNext(random.nextInt());
            emitter.onComplete();
        });
    }

大多数API只是让调用应用程序设置subscribeOn线程(想象这些测试是我的应用程序):

import org.junit.Test;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;

import static com.google.common.truth.Truth.assertThat;

public class ExampleTest {

    @Test
    public void canSetSubscribeOnThread() {
        Observable<Integer> coloObservable = SomeApiClass.immediatelyDoWork()
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread());

        TestObserver<Integer> testObserver = coloObservable.test();
        testObserver.awaitCount(2); // wait for a few emissions

        assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
    }

    @Test
    public void canSetSubscribeOnThreadIfApiUsesInterval() {
        Observable<Integer> coloObservable = SomeApiClass.periodicallyDoWork()
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread());

        TestObserver<Integer> testObserver = coloObservable.test();
        testObserver.awaitCount(2); // wait for a few emissions

        assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
    }
}

immediate示例中的IIUC所有订阅副作用(包括just())都将在新线程上发生。 Karnok explains well here

但在periodic示例中,interval将使用默认(计算)调度程序。大多数API在这种情况下做了什么?除了interval本身之外,他们是否让调用者为所有订阅副作用设置了subscribeOn线程?在上面的periodic测试中,除了interval之外,我们仍然可以设置subscribeOn线程。或者他们也添加一个参数来设置这个subscribeOn:

/**
 * Works like {@link #periodicallyDoWork()} but allows caller to set subscribeOnSchedueler
 */
static Observable<Integer> periodicallyDoWork(Scheduler subscribeOnScheduler) {
    return Observable.interval(1, TimeUnit.SECONDS, subscribeOnScheduler)
            .doOnSubscribe(ignore -> doOnSubscribeThread[0] = Thread.currentThread().getName())
            .flatMap(ignore -> doWork());
}

然后允许调用者省略subscribeOn()方法:

@Test
public void canSetSubscribeOnThreadIfApiUsesInterval() {
    Observable<Integer> coloObservable = SomeApiClass.periodicallyDoWork(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());

    TestObserver<Integer> testObserver = coloObservable.test();
    testObserver.awaitCount(2); // wait for a few emissions

    assertThat(SomeApiClass.getSubscribedOnThread()).contains("RxNewThreadScheduler");
}

这有点矫枉过正吗?只要调用者也调用subscribeOn(),让interval使用默认计算调度程序有什么危险吗?

rx-java2
1个回答
1
投票

在我看来,创建观察者链的API必须提供注入调度程序的方法。没有这种能力,单元测试几乎无法管理。

我在为实时系统编写测试方面有相当多的经验。只需能够为被测单元提供一个或两个TestScheduler,就可以在合理测试和不打扰之间做出区别。考虑一个debounce()方法周期为1秒的子系统。如果不能使用TestScheduler并使用advanceTimeBy()来控制时钟,则编写几十个案例的单元测试是不可行的。这意味着单元测试可以在10毫秒内完成,如果使用常规调度程序则需要几分钟。

© www.soinside.com 2019 - 2024. All rights reserved.