在RxJS(ES6)中,我试图在单个Observable中按顺序进行一系列操作来获取结果。
我不确定是否必须使用forkJoin(但是我想按顺序执行操作)或concat运算符(但是我想在所有它们都执行完后才通知我)。
我尝试:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
concat
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
在两种情况下,我都看不到来自HttpClient
的可观察对象(它们未发送http请求),但我可以看到日志记录。
批处理中的被调用方法如下:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
我像下面这样调用sync()函数:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
输出:
(8) calling...
sync done
但是没有HTTP请求开始。
如果在管道图中在forkJoin / concat之外执行相同的操作,则可以看到HTTP请求已发送。
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
我想念什么?
---更新-解决方案---
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
尝试
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
这假设this.db.getProducts()
是同步静态数据,而不是可观察/异步数据。
然后您可以尝试做
this.productService.sync().subscribe(() => { console.log('sync done'); });
并查看是否正在进行任何API调用。
您的问题不是这里的forkJoin/concat
,它是使用map
返回另一个可观察值。
在您的代码中,map
将返回Observable<Observable<any>>
。您的IDE应该突出显示这些内容(不要使用any
,也许这是个问题)。
您的解决方案是将map
更改为concatMap
(至少用于测试),下一个chain应该起作用。
此外,调试subscribe
及其返回值以查看所拥有的内容。