使用WebClient和Reactor 3.0进行递归API调用

问题描述 投票:0回答:1

我终于用Reactor学习函数式编程了。所以我是新手。

我想做的第一件事是使用WebClient调用外部API。这个调用需要递归,因为响应提供了调用参数的下一个值,我需要在下一次调用时使用它,直到满足琐碎的情况。

这是我们提出的:

    Flux.from(p -> queryUntilNow())
            .flatMap(res -> // res is object )
            .subscribe( // process )



private Flux<ApiResp> queryUntilNow() {
    return Flux.from(p -> {
        queryAPI(since)
                .doOnError(System.out::println)
                .subscribe(apiResp -> {
                    if (since == apiResp.last) return;

                    since = apiResp.last;
                    queryUntilNow();
                });
    });
}

private Flux<ApiResp> queryAPI(int last) {
    Flux<ApiResp> resp = kapi.get()
            .uri("/OHLC?pair={pair}&since={since}&interval={int}", pair, last, interval)
            .retrieve()
            .bodyToFlux(ApiResp.class);

    return resp;
}

好像我需要更多地调整我对这种编程风格的想法,所以请给我一些例子和解释。

谢谢!

java spring-webflux project-reactor
1个回答
0
投票

如果您只需要循环遍历批量返回的线性结果(而不是递归树),则可以使用重复通量,其起点在每次重复时发生变化。

这是一个完整的例子,只是模拟api调用。您可以在queryFrom中的WebClient调用中替换:

public class Main {

    private static class ApiResp {
        private final int last;
        private ApiResp(int last) {
            this.last = last;
        }
    }

    public static void main(String[] args) {
        queryBetween(0, 15)
                .doOnNext(apiResp -> System.out.println(apiResp.last))
                .blockLast();
    }

    public static Flux<ApiResp> queryBetween(int startInclusive, int endExclusive) {
        // The starting point of the next iteration
        final AtomicReference<Integer> nextIterationStart = new AtomicReference<>(startInclusive);
        return Flux
                // defer will cause a new Flux with a new starting point to be created for each subscription
                .defer(() -> queryFrom(nextIterationStart.get()))
                // update the starting point of the next iteration
                .doOnNext(apiResp -> nextIterationStart.set(apiResp.last + 1))
                // repeat with a new subscription if we haven't reached the end yet
                .repeat(() -> nextIterationStart.get() < endExclusive)
                // make sure we didn't go past the end if queryFrom returned more results than we need
                .takeWhile(apiResp -> apiResp.last < endExclusive);
    }

    public static Flux<ApiResp> queryFrom(int start) {
        // simulates an api call that always returns 10 results from the starting point
        return Flux.range(start, 10)
                .map(ApiResp::new);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.