RXJS while循环用于分页演示

问题描述 投票:5回答:2

我尝试从swap api查询所有人的数据。 URL swapi.co/api/people返回一个带有人员数组和URL(swapi.co/api/people/?page=2)的对象,我从中获取下一个数据。我想要做的是订阅方法每次新页面可用时更新角度组件。

我是反应式编程模型的新手。如何实现while循环或一系列Observable?

这适用于第一页:

getAllPeople(): Observable<Person[]> {
    let nextUrl = http://swapi.co/api;

    let source = Observable.create(observer => {

            this.http.get(nextUrl, { headers: this.headers })
            .map(response => {
                    let body = response.json();
                    nextUrl = body.next;
                    return nextUrl != null ? body.results as Person[] : null;
                }
            )
            .retry(5)//send same request based on the url of the previous request till the field next is null
            .catch(error => observer.error(error))
            .subscribe(persons => {
                    if (persons !== null) {
                        observer.next(persons)
                    }
                }
            );

        //observer.complete();
    })


    return source;
}

编辑:我添加了retry()方法。我希望我更容易理解我打算做什么。在第一个请求之后,它会向同一个URL发送更多请求。不幸的是,它不使用我从上一个请求获得的新URL。

http://swapi.co/api/?page=1 - > http://swapi.co/api/?page=2 - > http://swapi.co/api/?page=3 - > null

我不知道需要多少请求来检索所有数据。

javascript angularjs typescript rxjs reactive-programming
2个回答
9
投票

通过使用expand运算符,您可以为返回的每个值创建一个新的Observable序列。为了阻止expand创建任何新序列,一旦我们得到一个null的下一个URL,我们就可以返回一个空的Observable来停止扩展我们的序列。最后,我们可以使用reduce运算符将不同的数据页面聚合到一个数组中,然后我们将其发送回原始订阅者。这可以通过以下方式完成:

public getAllPeople(): Observable<Person[]> {
    return Observable.create(observer => {
          this.getPage("http://swapi.co/api/people/")
            .expand((data, i) => {
                return data.next ? this.getPage(data.next) : Observable.empty();
            })
            .reduce((acc, data) => {
                return acc.concat(data.results);
              }, [])
            .catch(error => observer.error(error))
            .subscribe((people) => {
                  observer.next(people);
                  observer.complete();
            });
    });
}

private getPage(url: string): Observable<{next: string, results: Person[]}> {
  return this.http.get(url, { headers: this.headers })
            .map(response => {
                    let body = response.json();
                    return {
                      next: body.next,
                      results: body.results as Person[]
                    };
                }
            );
}

Demo


1
投票

使用扩展运算符,如:

function getData(id) {
   let next = id<5 ? id+1 : null;
   let obj = {id:id, next:next}
   return next ?   Rx.Observable.of(obj) : Rx.Observable.empty();
 }

 getData(0)
   .expand(el=> getData(el.next))
   .subscribe(x=>console.log(x));