要学习rxjs,我会一起玩。我的代码:
// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';
const source = interval(1000);
const example = source.pipe(
map(value => value +1),
map(value => {
if(value === 40) {
finish();
}
else if (value % 5 === 0){
return 'can devide by 5 we did some magic';
}else{
return value;
} })
);
const subscribe = example.subscribe(
val => console.log(val),
error => console.log("Error handled: " , error),
() => console.log('resolved'));
我的想法是将其运行40次,然后完成可观察的操作(这可能是另一项要求,例如,在10:00时查看值是否为10(主要目标是对值进行评估并强制完成))。我正在寻找占位符finish()的替代方法,因为finish不存在。如何获得订阅方法的解析函数() => console.log('resolved')
?
我找到了How can I complete Observable in RxJS,但答案是从2015年开始的,我假设目前的rxjs版本已有答案。
两个答案都提到takeuntil和take都是正确的,但是另一种方法是使用订阅对象来取消订阅它只是另一种选择
const subx= example.subscribe(val => {
console.log(val);
if (val == 40) {
subx.unsubscribe()
}
});
更新😎
如果您有很多订阅者,并且您想放置构成可观察源的运算符的条件,可以在这里完成工作
const source = interval(1000).pipe(take(5)); // 👈
source.pipe(map(res => res * 10)).subscribe(val => {
console.log("🤯", val);
});
source.subscribe(val => {
console.log(val);
});
仍然还是一样,只需要使用管道运算符即可。您可以查看示例here
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));
我的想法是运行40次`
为此,您可以添加take(40)
。通常,有几种运算符,例如take
,可以完成一个可观察项。签出https://www.learnrxjs.io/operators/filtering/take.html
// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';
const source = interval(1000);
const example = source.pipe(
take(40),
map(value => value +1),
map(value => {
if(value === 40) {
finish();
}
else if (value % 5 === 0){
return 'can devide by 5 we did some magic';
}else{
return value;
} })
);
const subscribe = example.subscribe(
val => console.log(val),
error => console.log("Error handled: " , error),
() => console.log('resolved'));