在Rxjs中,有一个管道takeUntil,但是没有一个管道wait直到] >>,这使得当前的Observable等待第二个Observable发出。 我的最终目标是让许多Observable仍然等待,直到我的Observable X发出一个值来继续执行它们。这样我的Observable X必须执行一次并保留该值,所以我必须使用BehaviourSubject。
在这个简单的例子中,我想用waitUntil
代替takeUntil,所以源必须等待2秒(在wait $发出之后)才能开始发出其值。import { interval, timer } from 'rxjs'; import { takeUntil, skipWhile } from 'rxjs/operators'; import { BehaviorSubject } from 'rxjs'; const wait$ = new BehaviorSubject(null); //emits value every 500ms const source = interval(500) .pipe( takeUntil( wait$.pipe(skipWhile((val => val == null))) ) ); source.subscribe(val => console.log(val)); setTimeout(()=>{ wait$.next(1); },2000);
在Rxjs中,存在管道takeUntil,但是没有管道直等到,这使得当前的Observable等待第二秒Observable发出。我的最终目标是使许多可观察的物体仍然...
如果要“等待”另一个可观察的物体,只需使用concatMap
或switchMap
切换到它即可。
在您的示例中,您使用了BehaviorSubject
,因此等待它的所有内容都将立即执行。
如果我理解你的问题,我会看到2个潜在案例。
第一个是source
可观察对象开始独立于wait$
发出其值。当wait$
发出时,才开始使用source
发出的值。可以使用combineLatest
函数来实现此行为,如下所示:>
//emits value every 500ms
const source$ = interval(500);
combineLatest(source$, wait$)
.pipe(
map(([s, v]) => s), // to filter out the value emitted by wait$
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait$.next(1);
}, 2000);