尝试使用 create 方法创建字符串流(Flux),但它不起作用

问题描述 投票:0回答:1

我试图通过多次查询某些 API 端点来创建 Flux。我收到的有效负载有一个变量,指示数据是否已用尽,这意味着在此 JSON 有效负载中,有一个

done
变量,该变量为
false
,同时仍有数据要获取,并且在它被获取时设置为
true
已全部取走。然后我会完成这个 Flux。

我正在尝试完成这项工作,但失败了,所以为了避免让事情复杂化,我创建了一个更简单的代码,仅为本示例发出字符串。

我将模拟的接口,但这会返回

Mono<String>
,但在现实生活中,这将返回我从 API 获取的这条记录的
Mono

public interface RecordProvider {
    Mono<String> fetchString();
}

然后在应该处理这些的服务中我有这些方法:

private final AtomicInteger stringCounter = new AtomicInteger(0);

public Flux<String> assembleStringsFlux() {
    return Flux.<String>create(this::generateStringMono);
}

private void generateStringMono(FluxSink<String> fluxSink) {
    recordProvider.fetchString()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(
                    stringResult -> {
                        if (stringCounter.get() == 5) {
                            fluxSink.complete();
                        } else {
                            fluxSink.next(stringResult);
                        }
                    },
                    fluxSink::error);
}

如您所见,我将计算获取的字符串/项目的数量,5 后将完成。这不是现实生活中的场景,但这只是为了测试。

我尝试在我的测试中使其工作如下:

@Test
void givenString() {
    // given
    // when
    when(recordProvider.fetchString()).thenReturn((Mono<String>) Mono.just("bla"));

    var result = recordProcessor.assembleStringsFlux()
            .map(someString -> {
                System.out.println("whaaaat = " + someString);
                return someString;
            }).subscribeOn(Schedulers.boundedElastic());

    // then
    StepVerifier.create(result)
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .expectNextMatches(stringSignal -> stringSignal.equals("bla"))
            .verifyComplete();
}

当我运行此测试时,只有一个字符串被打印到日志中,然后看起来它只是挂起,直到它以错误结束。

你知道我做错了什么吗?

更新 1:好吧,正如我担心的那样,我想单个订阅是不够的。因此,我需要一个循环来获取这些元素,直到满足完成 Flux 的条件。但我想知道的是,如果我访问一个返回

Mono
的方法,我真的应该这样做吗:

    return Flux.create(fluxSink -> {
            do {
                var myStr = recordProvider.fetchString().block();
                fluxSink.next(myStr);
                stringCounter.incrementAndGet();
            } while (stringCounter.get() < 5);
            fluxSink.complete();

因为我不想打电话

block()
并且我想知道如何一遍又一遍地订阅?

spring-boot project-reactor flux
1个回答
0
投票

好吧,我明白了,我犯了一个错误,一个很大的忽视......

generateStringMono 方法应该如下所示:

private void generateStringMono(FluxSink<String> fluxSink) {
    recordProvider.fetchString()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe(
                    stringResult -> {
                        if (stringCounter.get() == 5) {
                            fluxSink.complete();
                        } else {
                            fluxSink.next(stringResult);
                            generateStringMono(sluxSink); // <-- forgot this
                        }
                    },
                    fluxSink::error);
}

所以基本上我需要再次调用该方法。

同样在测试中,不应该使用

thenReturn
,而是使用
thenAnswer
,它应该看起来像这样:

when(recordProvider.generateStringMono()).thenAnswer(new Answer<Mono<String>>() {
            @Override
            public Mono<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                return monoList.get(callCounter.getAndIncrement());
            }
        });

上面的

monoList
只是不同字符串的列表,然后我检查我的步骤验证器,以便它并不总是相同。

© www.soinside.com 2019 - 2024. All rights reserved.