强制一个可观察对象基于另一个可观察对象抛出

问题描述 投票:0回答:1

我有一个名为“poll”的函数,它使我能够轮询 API 端点。

在某些时候,我希望通过手动操作来停止轮询,这里名为“manualForceStop”,它是一个以“false”开头初始化的BehaviorSubject。在某些时候(例如,当用户触发某个操作时),我会将 manualForceStop 传递为 true。此时,我想取消订阅并抛出特定错误,例如“throwError(() => 'MANUAL_FORCE_STOP'”。

我不知道该怎么办,因为重试总是......重试。有什么想法吗?

const CONVERT_SECONDS_TO_MS = 1000;
interface PollCmd<TApiArgs, TApiResponse> {
  intervalInSeconds: number;
  timeoutInSeconds: number;
  apiCall: (arg?: TApiArgs) => Observable<TApiResponse>;
  shouldContinuePolling: (arg: TApiResponse) => boolean;
  manualForceStop: BehaviorSubject<boolean>;
}

export function poll<TApiArgs, TApiResponse>(
  input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
  return input.apiCall().pipe(
    switchMap((apiResponse) => {
      if (input.shouldContinuePolling(apiResponse)) {
        return throwError(() => 'CONDITION_NOT_REACHED');
      }

      return of(apiResponse);
    }),
    retry({ delay: input.intervalInSeconds * CONVERT_SECONDS_TO_MS }),
    timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
    /* 
      At some point, manualForceStop passes from "false" to "true" and then,
      I would like to unsubscribe and throw a specific error
    */
  );
typescript rxjs
1个回答
0
投票

查看

retry
RetryConfig#delay
属性,我注意到您可以传递函数而不是数字。

const CONDITION_NOT_REACHED = 'CONDITION_NOT_REACHED';
export function poll<TApiArgs, TApiResponse>(
  input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
  return input.apiCall().pipe(
    switchMap((apiResponse) => {
      if (input.shouldContinuePolling(apiResponse)) {
        return throwError(() => CONDITION_NOT_REACHED);
      }

      return of(apiResponse);
    }),
    retry({ delay: (err, retryCount) => {
      if (err === CONDITION_NOT_REACHED) {
        return throwError(() => CONDITION_NOT_REACHED);
      }

      return timer(input.intervalInSeconds * CONVERT_SECONDS_TO_MS).pipe(take(1));
    } }),
    timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
    /* 
      At some point, manualForceStop passes from "false" to "true" and then,
      I would like to unsubscribe and throw a specific error
    */
  );
}

从传递给

delay
的函数返回的 observable 具有以下属性:

  • 当它发出时,会发生重试
  • 如果抛出,错误将被传播
  • 如果不发出,错误将不会被传播并且它会毫无错误地完成
© www.soinside.com 2019 - 2024. All rights reserved.