我试图通过多次查询某些 API 端点来创建 Flux。我收到的有效负载有一个变量,指示数据是否已用尽,这意味着在此 JSON 有效负载中,有一个
done
变量,该变量为 false
,同时仍有数据要获取,并且在它被获取时设置为 true
已全部取走。然后我会完成这个 Flux。
我正在尝试完成这项工作,但失败了,所以为了避免让事情复杂化,我创建了一个更简单的代码,仅为本示例发出字符串。
我将模拟的接口,但这会返回
Mono<String>
,但在现实生活中,这将返回我从 API 获取的这条记录的 Mono
。
public interface RecordProvider {
Mono<String> fetchString();
}
然后在应该处理这些的服务中我有这些方法:
private final AtomicInteger stringCounter = new AtomicInteger(0);
public Flux<String> assembleStringsFlux() {
return Flux.<String>create(this::generateStringMono);
}
private void generateStringMono(FluxSink<String> fluxSink) {
recordProvider.fetchString()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
stringResult -> {
if (stringCounter.get() == 5) {
fluxSink.complete();
} else {
fluxSink.next(stringResult);
}
},
fluxSink::error);
}
如您所见,我将计算获取的字符串/项目的数量,5 后将完成。这不是现实生活中的场景,但这只是为了测试。
我尝试在我的测试中使其工作如下:
@Test
void givenString() {
// given
// when
when(recordProvider.fetchString()).thenReturn((Mono<String>) Mono.just("bla"));
var result = recordProcessor.assembleStringsFlux()
.map(someString -> {
System.out.println("whaaaat = " + someString);
return someString;
}).subscribeOn(Schedulers.boundedElastic());
// then
StepVerifier.create(result)
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.expectNextMatches(stringSignal -> stringSignal.equals("bla"))
.verifyComplete();
}
当我运行此测试时,只有一个字符串被打印到日志中,然后看起来它只是挂起,直到它以错误结束。
你知道我做错了什么吗?
更新 1:好吧,正如我担心的那样,我想单个订阅是不够的。因此,我需要一个循环来获取这些元素,直到满足完成 Flux 的条件。但我想知道的是,如果我访问一个返回
Mono
的方法,我真的应该这样做吗:
return Flux.create(fluxSink -> {
do {
var myStr = recordProvider.fetchString().block();
fluxSink.next(myStr);
stringCounter.incrementAndGet();
} while (stringCounter.get() < 5);
fluxSink.complete();
因为我不想打电话
block()
并且我想知道如何一遍又一遍地订阅?
好吧,我明白了,我犯了一个错误,一个很大的忽视......
generateStringMono 方法应该如下所示:
private void generateStringMono(FluxSink<String> fluxSink) {
recordProvider.fetchString()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
stringResult -> {
if (stringCounter.get() == 5) {
fluxSink.complete();
} else {
fluxSink.next(stringResult);
generateStringMono(sluxSink); // <-- forgot this
}
},
fluxSink::error);
}
所以基本上我需要再次调用该方法。
同样在测试中,不应该使用
thenReturn
,而是使用thenAnswer
,它应该看起来像这样:
when(recordProvider.generateStringMono()).thenAnswer(new Answer<Mono<String>>() {
@Override
public Mono<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
return monoList.get(callCounter.getAndIncrement());
}
});
上面的
monoList
只是不同字符串的列表,然后我检查我的步骤验证器,以便它并不总是相同。