如何等待带有promise的observable的每个值

问题描述 投票:1回答:2

假设我有这个可观察的:

const obs = new Observable((observer) => {
    observer.next(0.25);
    observer.next(0.75);
    observer.next(new ArrayBuffer(100));
    observer.complete();
});

我怎么能等待承诺的每个价值?

以下代码仅返回最后一个值(调用complete()之前的值):

const value = await obs.toPromise();

但我希望能够在整个过程中获得每个价值。我可以这样做:

const value1 = await obs.pipe(take(1)).toPromise();
const value2 = await obs.pipe(take(2)).toPromise();

但这并不理想,因为我每次都必须增加数字,并且take(1000)仍然会在示例中返回一些内容,即使只有3个值。我正在寻找类似的东西:

const value1 = await obs.pipe(next()).toPromise(); // 0.25
const value2 = await obs.pipe(next()).toPromise(); // 0.75
const value3 = await obs.pipe(next()).toPromise(); // ArrayBuffer(100)
const value4 = await obs.pipe(next()).toPromise(); // null

这更类似于发电机。

有没有办法完成这样的事情?

javascript promise rxjs observable rxjs6
2个回答
0
投票

看起来你要求的是一种将observable转换为异步迭代的方法,这样你就可以“手动”或使用新的for-await-of语言功能异步迭代它的值。

Heres是一个如何做到这一点的例子(我没有测试过这段代码,所以它可能有一些bug):

// returns an asyncIterator that will iterate over the observable values
function asyncIterator(observable) {
  const queue = []; // holds observed values not yet delivered
  let complete = false;
  let error = undefined;
  const promiseCallbacks = [];

  function sendNotification() {
    // see if caller is waiting on a result
    if (promiseCallbacks.length) {
      // send them the next value if it exists
      if (queue.length) {
        const value = queue.shift();
        promiseCallbacks.shift()[0]({ value, done: false });
      }
      // tell them the iteration is complete
      else if (complete) {
        while (promiseCallbacks.length) {
          promiseCallbacks.shift()[0]({ done: true });
        }
      }
      // send them an error
      else if (error) {
        while (promiseCallbacks.length) {
          promiseCallbacks.shift()[1](error);
        }
      }
    }
  }

  observable.subscribe(
    value => {
      queue.push(value);
      sendNotification();
    },
    err => {
      error = err;
      sendNotification();
    },
    () => {
      complete = true;
      sendNotification();
    });

  // return the iterator
  return {
    next() {
      return new Promise((resolve, reject) => {
        promiseCallbacks.push([resolve, reject]);
        sendNotification();
      });
    }
  }
}

与for-wait-of语言功能一起使用:

async someFunction() {
  const obs = ...;
  const asyncIterable = {
    [Symbol.asyncIterator]: () => asyncIterator(obs)
  };

  for await (const value of asyncIterable) {
    console.log(value);
  }
}

使用时无需等待语言功能:

async someFunction() {
  const obs = ...;
  const it = asyncIterator(obs);

  while (true) {
    const { value, done } = await it.next();
    if (done) {
      break;
    }

    console.log(value);
  }
}

0
投票

这可能只是工作,因为take(1)完成了observable,然后它被await消耗,接下来的一行将产生第二次发射到value2。

const observer= new Subject()
async function getStream(){
  const value1 = await observer.pipe(take(1)).toPromise() // 0.25
  const value2 = await observer.pipe(take(1)).toPromise() // 0.75
  return [value1,value2]
}
getStream().then(values=>{
  console.log(values)
})
//const obs = new Observable((observer) => {
 setTimeout(()=>{observer.next(0.25)},1000);
 setTimeout(()=>observer.next(0.75),2000);

更新:使用主题发射。

© www.soinside.com 2019 - 2024. All rights reserved.