我只想在获得第一个发出的值时执行 tap()
类似:
Observable
.pipe(
tap(() => { /* execute only when I get the first emitted value */ })
)
.subscribe(() => {
// .....
})
您可以在地图运算符中使用索引,例如
concatMap
。与其他方法不同,这对所选索引是完全灵活的。假设您想点击第二次发射 index === 1
或任何谓词,例如 index % 2 === 0
// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;
// main code
const stream = interval(250).pipe(take(4))
stream.pipe(
concatMap((value, index) => index === 0
? of(value).pipe(
tap(() => console.log('tap'))
)
: of(value)
)
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/rxjs.umd.js"></script>
如果我正确地理解了您的想法,您只想在流订阅开始时而不是其他时间执行
tap()
。 这是我的自定义运算符:
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
of({}).pipe(tap(callback), switchMap((o) => source));
}
例如,该运算符的用法是:
this.api.getData().pipe(
startWithTap(() => this.loading.start()),
)
这是我的实际代码示例,当有人 订阅 api 服务创建的 Observable(通过 httpClient)。
更新
使用这个代替上面的实现,因为这个只使用
defer
,而不是使用of
,tap
和switchMap
:
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
defer(() => {
callback();
return source;
});
}
我喜欢jal的回答的方法,并建议将其包装在自己的运算符中:
export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (index === tapIndex) {
tapFn(value);
}
return of(value);
}));
}
用法如下:
stream.pipe(tapOnce(() => console.log('tapping once'), 1));
这甚至可以进一步抽象为一个运算符,该运算符采用一个函数来确定是否应该根据给定的值/索引进行点击:
export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (evaluateFn(index, value)) {
tapFn(value);
}
return of(value);
}));
}
如果有人感兴趣,这里有一个tapN的额外简单实现。因此它会为每次发射执行指定的回调函数,直到发射数量等于nEmissions。为了仅对第一个元素执行 tap() 函数,您可以执行 tapN(1),但您也可以使用例如 tapN(3) 来执行 3 个首次发射的 tap。
/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
defer(() => {
let counter = 0;
return source$.pipe(tap((item) => {
if (counter < nEmissions) {
callback(item);
counter++;
}
}));
});
在您的代码中:
Observable
.pipe(
tapN(1, () => { /* this code would be only executed on the first emitted value */ })
)
.subscribe(() => {
// .....
})
除了已经提到的选项之外,您还可以使用
multicast
。
multicast(new Subject(), s => concat(
s.pipe(
take(1),
tap(v => console.log('tap', v)),
),
s
)
您可以像下面这样分享()您的主要 Observable:
import { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';
const source$ = timer(1000)
.pipe(
tap((v) => console.log('SIDE EFFECT')),
mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts
这是一个类似于 learn-rxjs
的示例RxJS 实际上应该在
tap
中放置一个索引,但在那之前,请使用它而不是 tap
:
skipWhile((x, i) => {
if (i !== 0) /* Do something */;
return false;
}),
(更新我之前的错误答案)
根据 cartant 的评论和提供的链接,他已经完成了创建一个执行此操作的操作符的工作,它位于“rxjs-etc”包中。基于他的操作员的解决方案是安装'rxjs-etc',然后:
import { initial } from 'rxjs-etc/operators';
observable$.pipe(
initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
// .....
})
工作StackBlitz示例。