对数组中的每个项目进行多个 api 调用的批量操作

问题描述 投票:0回答:1

我正在尝试构建一个批处理过程,迭代可观察的“客户”对象数组...

  • 构建一个请求对象来调用api,返回一个observable
  • 根据结果,构建另一个请求对象来调用第二个 api,返回一个 observable
  • 记录一切完成后要显示的结果数组

我不知道如何知道一切何时完成。我想我需要通过管道 mergeMap() 加上 Finalize,但我在计算细节时遇到了困难。

private batchAdd(customers: CustomerModel[]):void {
this.isCustomersTableLoading = true;
const customers$ = from(customers);
customers$
  .pipe(
    mergeMap(async (customer, index) => [await this.mymapper(customer), index], 1),
    finalize(() => {
      this.isCustomersTableLoading = false;
      //NOTE - show results
    })
  )
  .subscribe((res) => { console.debug('res',res); });
}

private mymapper(customer: CustomerModel): Observable<ResponseModel<QuickbooksOnlineCustomerModel>> {
 let qboCustomerAdd: QuickbooksOnlineAddCustomerModel = {
   //initialize from customer
 }
 //call QBO
 this.quickbooksOnlineService.addCustomer(qboCustomerAdd)
  .subscribe(
   async res => {
    //do success stuff, call another api, record results
   },
   (err) => { //do err stuff });

  //NOTE - I don't know what to return for mergeMap()
}
angular rxjs
1个回答
0
投票

如果我理解问题正确,我会进行如下操作

private batchAdd(customers: CustomerModel[]):void {
   this.isCustomersTableLoading = true;
   const customers$ = from(customers);
   customers$
     .pipe(
       // use mergeMap if you want to control how many requests you
       // want on-flight - consider that if you want just one request
       // executing at a time, then you can also use concatMap, which is
       // equivalent to mergeMap with concurrency set to 1 (which is
       // what you are doing in your code).
       // You do not need to use async/await, mergeMap can return
       // an Observable, which is the idiomatic way to use this operator.
       // therefore you need to change mymapper so that it returns an Observable
       mergeMap((customer, index) => this.mymapper(customer), 1),
       // You do not need to use finalize here. You can use error and complete
       // in the subscribe. Consider that subscribe should receive, as 
       // parameter, an object with 3 properties (next, error and complete)
       // each pointing to a proper function
     )
     .subscribe({
         next: (res) => { console.debug('res',res); },
         error: (err) => { // handle error here },
         complete: () => {
            this.isCustomersTableLoading = false;
            //NOTE - show results
         }
     });
}

private mymapper(customer: CustomerModel): Observable<ResponseModel<QuickbooksOnlineCustomerModel>> {
   let qboCustomerAdd: QuickbooksOnlineAddCustomerModel = {
     //initialize from customer
   }
   // return the Observable that represents the call to QBO, whatever this is.
   // I assume this.quickbooksOnlineService.addCustomer returns an Observable.
   // You must not subscribe here, but you can pipe other actions
   // to perform further API calls
   return this.quickbooksOnlineService.addCustomer(qboCustomerAdd).pipe(
      // concatenate further API calls
      concatMap(resOfQBO => {
         // callToOtherAPI is a function that returns an Observable
         // representing the call to another API
         return this.callToOtherAPI(resOfQBO).pipe(
            // here you can merge the return of the other API with
            // resOfQBO and return the desired value using map
            map(resOfOtherAPI => {
              // return the merged result, e.g.
              return {...resOfQBO, ...resOfOtherAPI}
            })
         )
      })
   )
}
© www.soinside.com 2019 - 2024. All rights reserved.