我正在尝试构建一个批处理过程,迭代可观察的“客户”对象数组...
我不知道如何知道一切何时完成。我想我需要通过管道 mergeMap() 加上 Finalize,但我在计算细节时遇到了困难。
private batchAdd(customers: CustomerModel[]):void {
this.isCustomersTableLoading = true;
const customers$ = from(customers);
customers$
.pipe(
mergeMap(async (customer, index) => [await this.mymapper(customer), index], 1),
finalize(() => {
this.isCustomersTableLoading = false;
//NOTE - show results
})
)
.subscribe((res) => { console.debug('res',res); });
}
private mymapper(customer: CustomerModel): Observable<ResponseModel<QuickbooksOnlineCustomerModel>> {
let qboCustomerAdd: QuickbooksOnlineAddCustomerModel = {
//initialize from customer
}
//call QBO
this.quickbooksOnlineService.addCustomer(qboCustomerAdd)
.subscribe(
async res => {
//do success stuff, call another api, record results
},
(err) => { //do err stuff });
//NOTE - I don't know what to return for mergeMap()
}
如果我理解问题正确,我会进行如下操作
private batchAdd(customers: CustomerModel[]):void {
this.isCustomersTableLoading = true;
const customers$ = from(customers);
customers$
.pipe(
// use mergeMap if you want to control how many requests you
// want on-flight - consider that if you want just one request
// executing at a time, then you can also use concatMap, which is
// equivalent to mergeMap with concurrency set to 1 (which is
// what you are doing in your code).
// You do not need to use async/await, mergeMap can return
// an Observable, which is the idiomatic way to use this operator.
// therefore you need to change mymapper so that it returns an Observable
mergeMap((customer, index) => this.mymapper(customer), 1),
// You do not need to use finalize here. You can use error and complete
// in the subscribe. Consider that subscribe should receive, as
// parameter, an object with 3 properties (next, error and complete)
// each pointing to a proper function
)
.subscribe({
next: (res) => { console.debug('res',res); },
error: (err) => { // handle error here },
complete: () => {
this.isCustomersTableLoading = false;
//NOTE - show results
}
});
}
private mymapper(customer: CustomerModel): Observable<ResponseModel<QuickbooksOnlineCustomerModel>> {
let qboCustomerAdd: QuickbooksOnlineAddCustomerModel = {
//initialize from customer
}
// return the Observable that represents the call to QBO, whatever this is.
// I assume this.quickbooksOnlineService.addCustomer returns an Observable.
// You must not subscribe here, but you can pipe other actions
// to perform further API calls
return this.quickbooksOnlineService.addCustomer(qboCustomerAdd).pipe(
// concatenate further API calls
concatMap(resOfQBO => {
// callToOtherAPI is a function that returns an Observable
// representing the call to another API
return this.callToOtherAPI(resOfQBO).pipe(
// here you can merge the return of the other API with
// resOfQBO and return the desired value using map
map(resOfOtherAPI => {
// return the merged result, e.g.
return {...resOfQBO, ...resOfOtherAPI}
})
)
})
)
}