我终于用Reactor学习函数式编程了。所以我是新手。
我想做的第一件事是使用WebClient调用外部API。这个调用需要递归,因为响应提供了调用参数的下一个值,我需要在下一次调用时使用它,直到满足琐碎的情况。
这是我们提出的:
Flux.from(p -> queryUntilNow())
.flatMap(res -> // res is object )
.subscribe( // process )
private Flux<ApiResp> queryUntilNow() {
return Flux.from(p -> {
queryAPI(since)
.doOnError(System.out::println)
.subscribe(apiResp -> {
if (since == apiResp.last) return;
since = apiResp.last;
queryUntilNow();
});
});
}
private Flux<ApiResp> queryAPI(int last) {
Flux<ApiResp> resp = kapi.get()
.uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
.retrieve()
.bodyToFlux(ApiResp.class);
return resp;
}
好像我需要更多地调整我对这种编程风格的想法,所以请给我一些例子和解释。
谢谢!
如果您只需要循环遍历批量返回的线性结果(而不是递归树),则可以使用重复通量,其起点在每次重复时发生变化。
这是一个完整的例子,只是模拟api调用。您可以在queryFrom
中的WebClient调用中替换:
public class Main {
private static class ApiResp {
private final int last;
private ApiResp(int last) {
this.last = last;
}
}
public static void main(String[] args) {
queryBetween(0, 15)
.doOnNext(apiResp -> System.out.println(apiResp.last))
.blockLast();
}
public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
// The starting point of the next iteration
final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
return Flux
// defer will cause a new Flux with a new starting point to be created for each subscription
.defer(() -> queryFrom(nextIterationStart.get()))
// update the starting point of the next iteration
.doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
// repeat with a new subscription if we haven't reached the end yet
.repeat(() -> nextIterationStart.get() < endExclusive)
// make sure we didn't go past the end if queryFrom returned more results than we need
.takeWhile(apiResp -> apiResp.last < endExclusive);
}
public static Flux<ApiResp> queryFrom(int start) {
// simulates an api call that always returns 10 results from the starting point
return Flux.range(start, 10)
.map(ApiResp::new);
}
}