RxJS和Angular HttpClient:将forkJoin用于多个顺序请求

问题描述 投票:0回答:2

在RxJS(ES6)中,我试图在单个Observable中按顺序进行一系列操作来获取结果。

我不确定是否必须使用forkJoin(但是我想按顺序执行操作)或concat运算符(但是我想在所有它们都执行完后才通知我)。

我尝试:

forkJoin

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return forkJoin(batch);
        })
    );
  }

concat

  sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        map(products => {
            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            return concat(batch);
        })
    );
  }

在两种情况下,我都看不到来自HttpClient的可观察对象(它们未发送http请求),但我可以看到日志记录。

批处理中的被调用方法如下:

  updateProduct(product: Product) {
      console.log('calling...');
      const options = this.getDefaultRequestOptions();
      return this.http.post('update_product', product, options);
  }

我像下面这样调用sync()函数:

this.productService.sync().subscribe(() => {
  console.log('sync done');
}, error => this.utils.handleError(error));

输出:

(8) calling...
sync done

但是没有HTTP请求开始。

如果在管道图中在forkJoin / concat之外执行相同的操作,则可以看到HTTP请求已发送。

sync(): Observable<any> {

    const product = new Product(null, 'Title', 'Desc.', 'CODE');
    return this.api.updateProduct(product);

}

我想念什么?

---更新-解决方案---

sync(): Observable<any> {
    return from(this.db.getProducts()).pipe(
        flatMap(products => {

            if ( !products ) {
                return of(true);
            }

            const batch: Observable<any>[] = [];
            for ( const product of products ) {
              if ( product.toBeSync ) {
                  batch.push(this.api.updateProduct(product));
              }
            }

            console.log(batch);

            return concat(...batch);
            // return forkJoin(batch);
        })
    );
typescript rxjs concat angular-httpclient fork-join
2个回答
1
投票

尝试

sync(): Observable<any> {
  return from(this.db.getProducts()).pipe(
    map(products => {
      const batch: Observable<any>[] = [];
      for (const product of products) {
        batch.push(this.api.updateProduct(product));
      }
     return batch;
   }),
   switchMap(batch => forkJoin(batch)),
  )
}

这假设this.db.getProducts()是同步静态数据,而不是可观察/异步数据。

然后您可以尝试做

  this.productService.sync().subscribe(() => { console.log('sync done'); });

并查看是否正在进行任何API调用。


1
投票

您的问题不是这里的forkJoin/concat,它是使用map返回另一个可观察值。

在您的代码中,map将返回Observable<Observable<any>>。您的IDE应该突出显示这些内容(不要使用any,也许这是个问题)。

您的解决方案是将map更改为concatMap(至少用于测试),下一个chain应该起作用。

此外,调试subscribe及其返回值以查看所拥有的内容。

© www.soinside.com 2019 - 2024. All rights reserved.