RxJS执行多个异步操作并等待其完成的顺序执行。

问题描述 投票:1回答:1
/// ----------w1-----w2-----------------

/// -----------------------C1---------C2

存在一个写缓冲区,在这里可以发出命令,一旦完成,就会调用与之相关的回调fn。

要求写命令应按顺序执行,例如,if w1 命令,之后 w2, w2 应执行一次 w1 是完成。w1w2 应该调用各自的完成函数 C1, C2 一旦他们完成。

它是用RxJs实现的,它的工作符合规范,但有一些痛点。我没有使用回调,而是使用 await 来使代码流线性化。

function atomicWrite$(time, cmd) {
  return timer(time).pipe(
    take(1),
    tap(() => console.log('done cmd: ', cmd))
  );
}


const commandList = [];
let signalNextWrite$ = new ReplaySubject(1);

async function monsterProcess$(index, time) {

  console.log('entered loop', index);
  await signalNextWrite$.pipe(filter(idx => index === idx), take(1)).toPromise();

  console.log('processing start:', index, time);

  await of(commandList[index]).pipe(
    concatMap(cmd => atomicWrite$(time, cmd))
  ).toPromise();

  console.log('processing end:', index, time);

  signalNextWrite$.next(index + 1);

  return EMPTY.toPromise();
}

function writeIssue(command, time) {
  commandList.push(command);
  return monsterProcess$(commandList.length - 1, time);
}

signalNextWrite$.next(0);


setTimeout(async () => {
  const res = await writeIssue('windows xp2', 5000);
  console.log('completed windows');
});

setTimeout(async () => {
  const res = await writeIssue('mac siera', 1000);
  console.log('completed mac');
});

问题是,我没有使用回调,而是使用了 await 来使代码线性流动。signalNextWrite$在这里,发出的命令的index(idx)作为过滤器,可以阻止其他写命令的执行。

希望摆脱数组索引作为锁的逻辑,而使用其他逻辑来等待,等待当前命令完成并发出完成信号。

也想避免这种代码。

signalNextWrite$.next(0);

Stackblitz例如: https:/stackblitz.comedittypescript-r45p2d)。

rxjs mutex
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.