rxjs自定义重试,带有自动递增延迟的策略无法正常工作

问题描述 投票:0回答:1

我正在尝试创建自定义的retryWhen策略,该策略尝试X次之间延迟retry N次,然后失败。在某种程度上,learnrxjs.io示例正是我想要的。

很遗憾,此代码存在问题,我似乎无法弄清楚如何解决。就我而言,可观察对象可能会随机失败-您可以尝试2 successful次,然后尝试2 unsuccessful次。一段时间后,订阅会自动完成,因为retryAttempt会超出最大值,尽管实际上并没有发生。

为了更好地理解这个问题,我创建了一个StackBlitz

响应将是:

  Attempt 1: retrying in 1000ms
  0
  1
  Attempt 2: retrying in 2000ms
  Attempt 3: retrying in 3000ms
  0
  1
  We are done!

但实际上应该是

  Attempt 1: retrying in 1000ms
  0
  1
  Attempt 1: retrying in 1000ms <-- notice counter starts from 1
  Attempt 2: retrying in 2000ms
  0
  1
  Attempt 1: retrying in 1000ms <-- notice counter starts from 1
  0
  1
  Attempt 1: retrying in 1000ms <-- notice counter starts from 1
  Attempt 2: retrying in 2000ms
  0
  1
  ... forever

我觉得我在这里想念什么。

rxjs rxjs6
1个回答
0
投票

我认为文档中给出的示例是为Observable编写的,它仅发出一次然后完成,例如http get。假设如果您想获取更多数据,那么您将再次订阅,这将重置genericRetryStrategy中的计数器。但是,如果您现在想对长时间运行的可观察对象应用相同的策略,除非它给出错误,否则该对象的流将不会完成(例如,您使用interval()),那么您需要修改genericRetryStrategy()被告知何时需要重置计数器。

这可以通过多种方式完成,我在StackBlitz中给出了一个简单的示例,其依据是您说的要完成的工作。请注意,我也略微更改了您的逻辑,以更符合您说的尝试,即“成功尝试2次,然后尝试2次失败”。但是,重要的位正在修改抛出到genericRetryStrategy()中的错误对象,以传达失败尝试的当前计数,以便可以适当地做出反应。

这里是出于完整性而复制的代码:

import { timer, interval, Observable, throwError } from 'rxjs';
import { map, switchMap, tap, retryWhen, delayWhen, mergeMap, shareReplay, finalize, catchError } from 'rxjs/operators';

console.clear();

interface Err {
  status?: number;
  msg?: string;
  int: number;
}

export const genericRetryStrategy = ({
  maxRetryAttempts = 3,
  scalingDuration = 1000,
  excludedStatusCodes = []
}: {
  maxRetryAttempts?: number,
  scalingDuration?: number,
  excludedStatusCodes?: number[]
} = {}) => (attempts: Observable<any>) => {
  return attempts.pipe(
    mergeMap((error: Err) => {
      // i here does not reset and continues to increment?
      const retryAttempt = error.int;

      // if maximum number of retries have been met
      // or response is a status code we don't wish to retry, throw error
      if (
        retryAttempt > maxRetryAttempts ||
        excludedStatusCodes.find(e => e === error.status)
      ) {
        return throwError(error);
      }
      console.log(
        `Attempt ${retryAttempt}: retrying in ${retryAttempt *
          scalingDuration}ms`
      );
      // retry after 1s, 2s, etc...
      return timer(retryAttempt * scalingDuration);
    }),
    finalize(() => console.log('We are done!'))
  );
};

let int = 0;
let err: Err = {int: 0};
//emit value every 1s
interval(1000).pipe(
  map((val) => {
    if (val > 1) {
      //error will be picked up by retryWhen
      int++;
      err.msg = "equals 1";
      err.int = int;
      throw err;
    }
    if (val === 0 && int === 1) {
      err.msg = "greater than 2";
      err.int = 2;
      int=0;
      throw err;
    }
    return val;
  }),
  retryWhen(genericRetryStrategy({
    maxRetryAttempts: 3,
    scalingDuration: 1000,
    excludedStatusCodes: [],
  }))
).subscribe(val => {
  console.log(val)
});

对我来说,这仍然是非常必要的,但是在不了解您要更深入地解决的问题的情况下,我目前无法想到一种更具声明性的方法...

© www.soinside.com 2019 - 2024. All rights reserved.