我是RxJS observables的新手,我正试图解决一个相当简单的用例。
在服务中,我首先进行一个返回项目的http调用(作为一个observable)。该项包含一组id,其中一些重复。对于每个不同的id,我需要调用另一个http服务(再次返回一个observable),并将其返回值添加到原始项目而不是相应的id。这些调用应该并行发生。最后,每次调用完成后,我的服务应返回原始项目的可观察项,现在其子项已到位。
为了给出一个更好的想法,这就是承诺而不是可观察的东西:
MyService() {
return HttpGetMainItem()
.then(item => {
var promises = _.uniq(item.subItems)
.map(sid => HttpGetSubItem(sid)
.then(subItem => {
// add to matching item.subItems
}));
// wait for all promises to complete and return main item
return Promise.all(promises).then(() => item);
});
}
用observables完成这项工作的最佳方法是什么?
编辑:从答案来看,我似乎不是很清楚。使用promises的例子只是为了清楚,在我的例子中,http调用实际上是Angular的HttpClient.get,所以它们返回observables-我希望用observables做所有事情。
这是使用rxjs完成上述操作的一种方法。
from
将promise转换为observable。switchMap
,以便您可以调用另一个observable并将其作为外部observable的结果返回。你正在切换执行上下文。forkJoin
创建所有元素的promise的Observable。一旦所有承诺完成,ForkJoin将发出所有结果的数组。这就像promise.all
。码
from(HttpGetMainItem()).pipe(
switchMap(item =>
forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid)))
.pipe(
map(results => {
/* add results to matching items.subItems and */
return item;
})
)
)
);
我觉得这看起来有点笨重,因为需要保留根项和它所需的嵌套。您可以使用switchMap的selector参数来组合外部和内部可观察对象。您可以使用该参数代替map()
中的逻辑,并且由于两个observable的结果都已通过,因此您不需要任何进一步的嵌套。
from(HttpGetMainItem()).pipe(
switchMap(
item => forkJoin(_.uniq(item.subItems).map(sid => HttpGetSubItem(sid))),
(item, results) => {
/* add results to matching items.subItems and */
return item;
}
)
);
我相信你需要这样的东西:
import { from, forkJoin } from 'rxjs';
import { switchMap } from 'rxjs/operators';
itemWithSubItems$ = from(HttpGetMainItem()).pipe(
switchMap(item => {
const promises = _.uniq(item.subItems).map(item => HttpGetSubItem(item.sid));
return forkJoin(promises)
.map(resultArray => item.subItems = resultArray)
.map(_ => item);
})
);
首先获取主项目。然后使用forkJoin解析所有子查询并丰富主项。之后,只需返回主要项目。
也许你可以使用像async.each这样的库 - > https://caolan.github.io/async/docs.html#each(也许每个系列)。
会是这样的:
async.each(array, (obj, cb) => {
observable with obj in parameter and with subscriber result :
cb(err, subscriberRes);
}, (err, res) => {
console.log(res);
}