rxjs 仅在第一次执行 tap

问题描述 投票:0回答:8

我只想在获得第一个发出的值时执行 tap()

类似:

Observable
  .pipe(
     tap(() => { /* execute only when I get the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })
javascript rxjs
8个回答
34
投票

您可以在地图运算符中使用索引,例如

concatMap
。与其他方法不同,这对所选索引是完全灵活的。假设您想点击第二次发射
index === 1
或任何谓词,例如
index % 2 === 0

// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;


// main code
const stream = interval(250).pipe(take(4))

stream.pipe(
  concatMap((value, index) => index === 0
    ? of(value).pipe(
        tap(() => console.log('tap'))
      )
    : of(value)
  )
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/rxjs.umd.js"></script>


18
投票

如果我正确地理解了您的想法,您只想在流订阅开始时而不是其他时间执行

tap()
这是我的自定义运算符:

import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    of({}).pipe(tap(callback), switchMap((o) => source));
}

例如,该运算符的用法是:

this.api.getData().pipe(
  startWithTap(() => this.loading.start()),
)

这是我的实际代码示例,当有人 订阅 api 服务创建的 Observable(通过 httpClient)。


更新

使用这个代替上面的实现,因为这个只使用

defer
,而不是使用
of
tap
switchMap

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    defer(() => {
      callback();
      return source;
    });
}

8
投票

我喜欢jal的回答的方法,并建议将其包装在自己的运算符中:

export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (index === tapIndex) {
      tapFn(value);
    }
    return of(value);
  }));
}

用法如下:

stream.pipe(tapOnce(() => console.log('tapping once'), 1));

这甚至可以进一步抽象为一个运算符,该运算符采用一个函数来确定是否应该根据给定的值/索引进行点击:

export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (evaluateFn(index, value)) {
      tapFn(value);
    }
    return of(value);
  }));
}

4
投票

如果有人感兴趣,这里有一个tapN额外简单实现。因此它会为每次发射执行指定的回调函数,直到发射数量等于nEmissions。为了仅对第一个元素执行 tap() 函数,您可以执行 tapN(1),但您也可以使用例如 tapN(3) 来执行 3 个首次发射的 tap。

/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
    defer(() => {
        let counter = 0;
        return source$.pipe(tap((item) => {
            if (counter < nEmissions) {
                callback(item);
                counter++;
            }
        }));
    });

在您的代码中:

Observable
  .pipe(
     tapN(1, () => { /* this code would be only executed on the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })

3
投票

除了已经提到的选项之外,您还可以使用

multicast

multicast(new Subject(), s => concat(
  s.pipe(
    take(1),
    tap(v => console.log('tap', v)),
  ),
  s
)

现场演示:https://stackblitz.com/edit/rxjs-shvuxm


0
投票

您可以像下面这样分享()您的主要 Observable:

import { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';

const source$ = timer(1000)
.pipe(
  tap((v) => console.log('SIDE EFFECT')),
  mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;

sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);

https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts

这是一个类似于 learn-rxjs

的示例

0
投票

RxJS 实际上应该在

tap
中放置一个索引,但在那之前,请使用它而不是
tap
:

skipWhile((x, i) => {
    if (i !== 0) /* Do something */;
    return false;
}),

-1
投票

(更新我之前的错误答案)

根据 cartant 的评论和提供的链接,他已经完成了创建一个执行此操作的操作符的工作,它位于“rxjs-etc”包中。基于他的操作员的解决方案是安装'rxjs-etc',然后:

import { initial } from 'rxjs-etc/operators';

observable$.pipe(
    initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
    // ..... 
})

工作StackBlitz示例。

© www.soinside.com 2019 - 2024. All rights reserved.