我正在尝试在以下代码中添加一些单元测试:
public List<Stuff> extractStuff(Scheduler scheduler, List<Token> tokens) {
List<Observable<List<Stuff>>> observables = tokens
.stream()
.map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
.collect(Collectors.toList());
List<Stuff> result = new ArrayList<>();
for (List<Stuff> stuff: Observable.merge(observables).toBlocking().toIterable()) {
result.addAll(stuff);
}
return result;
}
我希望以并行方式获取Stuff
对象,但是我需要在进行任何进一步操作之前收集所有对象(否则,下一个过程没有任何意义。)>
代码按预期工作,但是我在单元测试中苦苦挣扎:
@Test public void extractStuff() { // GIVEN TestScheduler scheduler = new TestScheduler(); List<Token> tokens = buildTokens(); ... // WHEN List<Stuff> result = this.instance.extractStuff(scheduler, tokens); // Execution never comes to this point... // THEN ... }
使用调试器,我可以看到
Observable.just(...)
看起来不错(我的可观察对象列表不为空,并且我可以在其中看到我的自定义模拟对象)。
问题:
执行似乎停留在Observable.merge(observables).toBlocking()
表达式上。永远不会调用result.addAll
行。我已经尝试过使用TestScheduler
对象做几件事,但是我无法使其正常工作。我在Internet上找到的大多数示例都在处理一个返回Observable
对象的函数,因此相关的单元测试可以运行scheduler.advanceTimeBy(...);
。在我的情况下,由于无法直接返回Observable
对象,因此无法应用此方法。
非常感谢您的帮助!
我正在尝试向以下代码段添加一些单元测试:public List
我不确定所发布的代码是否符合您的期望。您的getStuffByToken(...)
方法实际上是在调用线程上而不是Scheduler
进行调用。
为简单起见,我将Token
替换为Integer
,将Stuff
替换为String
。
我的getStuffByToken(...)
将返回String
的Integer
表示形式,还将包括当前Thread
的名称:
private List<String> getStuffByToken( Integer token )
{
return Arrays.asList( token.toString(), Thread.currentThread().getName() );
}
我可能使用的是RxJava的不同版本,我没有toBlocking(...)
方法,但是有blockingIterable()
-我希望这是等效的:
public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
List<Observable<List<String>>> observables = tokens
.stream()
.map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
.collect(Collectors.toList());
List<String> result = new ArrayList<>();
for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
result.addAll(stuff);
}
return result;
}
如果我们测试以上内容:
@Test
public void testExtractStuff()
{
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
List<String> result = extractStuff( Schedulers.computation(), tokens );
System.out.println( result );
}
我们回来:
[1, main, 3, main, 4, main, 5, main, 2, main]
如您所知,所有getStuffByToken(...)
调用均在main Thread
上执行。
接下来,未调用您的测试方法的原因是,由于TestScheduler
要求调用TestScheduler.advanceTimeBy(...)
来模拟导致处理Rx管道的时间流逝。由于您的方法是阻塞方法,因此使用TestScheduler
进行测试将不太方便。
考虑到以上两个信息,建议您按照以下步骤进行操作:
public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
{
return Observable.fromIterable( tokens )
.flatMap( token -> Observable.just( token )
.subscribeOn( scheduler )
.map( this::getStuffByToken )
.flatMap( Observable::fromIterable ))
.toList();
}
您的生产代码可以调用extractStuff(...).blockingGet()
来解析List
。
而且,您可以进行如下测试:
@Test
public void testExtractStuff()
{
TestScheduler scheduler = new TestScheduler();
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();
scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
test.assertValueCount( 1 );
test.assertValue( list -> list.size() == 10 );
test.assertComplete();
}