假设我有这个可观察的:
const obs = new Observable((observer) => {
observer.next(0.25);
observer.next(0.75);
observer.next(new ArrayBuffer(100));
observer.complete();
});
我怎么能等待承诺的每个价值?
以下代码仅返回最后一个值(调用complete()之前的值):
const value = await obs.toPromise();
但我希望能够在整个过程中获得每个价值。我可以这样做:
const value1 = await obs.pipe(take(1)).toPromise();
const value2 = await obs.pipe(take(2)).toPromise();
但这并不理想,因为我每次都必须增加数字,并且take(1000)
仍然会在示例中返回一些内容,即使只有3个值。我正在寻找类似的东西:
const value1 = await obs.pipe(next()).toPromise(); // 0.25
const value2 = await obs.pipe(next()).toPromise(); // 0.75
const value3 = await obs.pipe(next()).toPromise(); // ArrayBuffer(100)
const value4 = await obs.pipe(next()).toPromise(); // null
这更类似于发电机。
有没有办法完成这样的事情?
看起来你要求的是一种将observable转换为异步迭代的方法,这样你就可以“手动”或使用新的for-await-of语言功能异步迭代它的值。
Heres是一个如何做到这一点的例子(我没有测试过这段代码,所以它可能有一些bug):
// returns an asyncIterator that will iterate over the observable values
function asyncIterator(observable) {
const queue = []; // holds observed values not yet delivered
let complete = false;
let error = undefined;
const promiseCallbacks = [];
function sendNotification() {
// see if caller is waiting on a result
if (promiseCallbacks.length) {
// send them the next value if it exists
if (queue.length) {
const value = queue.shift();
promiseCallbacks.shift()[0]({ value, done: false });
}
// tell them the iteration is complete
else if (complete) {
while (promiseCallbacks.length) {
promiseCallbacks.shift()[0]({ done: true });
}
}
// send them an error
else if (error) {
while (promiseCallbacks.length) {
promiseCallbacks.shift()[1](error);
}
}
}
}
observable.subscribe(
value => {
queue.push(value);
sendNotification();
},
err => {
error = err;
sendNotification();
},
() => {
complete = true;
sendNotification();
});
// return the iterator
return {
next() {
return new Promise((resolve, reject) => {
promiseCallbacks.push([resolve, reject]);
sendNotification();
});
}
}
}
与for-wait-of语言功能一起使用:
async someFunction() {
const obs = ...;
const asyncIterable = {
[Symbol.asyncIterator]: () => asyncIterator(obs)
};
for await (const value of asyncIterable) {
console.log(value);
}
}
使用时无需等待语言功能:
async someFunction() {
const obs = ...;
const it = asyncIterator(obs);
while (true) {
const { value, done } = await it.next();
if (done) {
break;
}
console.log(value);
}
}
这可能只是工作,因为take(1)完成了observable,然后它被await消耗,接下来的一行将产生第二次发射到value2。
const observer= new Subject()
async function getStream(){
const value1 = await observer.pipe(take(1)).toPromise() // 0.25
const value2 = await observer.pipe(take(1)).toPromise() // 0.75
return [value1,value2]
}
getStream().then(values=>{
console.log(values)
})
//const obs = new Observable((observer) => {
setTimeout(()=>{observer.next(0.25)},1000);
setTimeout(()=>observer.next(0.75),2000);
更新:使用主题发射。