使用 rxjs 更改请求参数的 HTTP 长轮询

问题描述 投票:0回答:1

我需要使用 rxjs 实现 HTTP 长轮询(在 Angular 环境中)。挑战在于我需要在每次调用时更改请求参数。更具体地说:我需要更改 from/to 值才能使其滚动。例如,始终从当前时间倒退 1 分钟。

现在我有以下实现:

const requestParameter$ = new BehaviorSubject<MetricAggregationParams>(initialRequestParameter);

this.metricsService
  .searchAggregatedMetrics(requestParameter$)
  .pipe(
    tap((metricInstancesResult) => {
      // do something with the result
    }),
    delay(3000),
    tap(() => {
      requestParameter$.next({
        ...requestParameter$.value,
        from: DateTime.now()
          .minus({ minutes: timerange.timerangeInMinutes as number })
          .toISO(),
        to: DateTime.now().toISO()
      });
    })
  )
  .subscribe();
})

searchAggregatedMetrics(requestParameter$: BehaviorSubject<MetricAggregationParamsDto>) {
  return requestParameter$.pipe(
    concatMap((requestParameter) =>
      this.http.post<MetricAggregationResult>(`REST_URL`, requestParameter)
    )
  );
}

以下是一些限制:

  • 第一个请求应立即开始(无延迟)
  • 下一个请求应在上一个请求完成后 + 3000ms 开始

有没有办法将长轮询逻辑全部放在 searchAggregateMetrics 方法中?

angular rxjs long-polling
1个回答
0
投票

如果我对问题的理解正确,那么你面临着某种递归问题。 rxJs 中的递归是通过

expand
运算符解决的。

您的问题的一个解决方案可能是这样的

restService(0).pipe(
  delay(300),
  map(resp => {
    console.log("do stuff with resp " + resp)
  }),
  map(() => {
    console.log("prepare next input " + counter++)
    return counter
  }),
  // expand is an operator that takes as input a function that
  // returns an Observable
  expand(input => {
    return restService(counter).pipe(
      delay(300),
      map(resp => {
        console.log("do stuff with resp " + resp)
      }),
      map(() => {
        console.log("prepare next input " + counter++)
        return counter
      }),
    )
  }),
  // we use take just to finish the iteration after a certain number of calls
  take(10)
).subscribe()

// counter used to simulate the fact that we change input at every call
let counter = 0
// function to simulate a rest service call
function restService(input: number) {
  return of(input).pipe(
    tap(input => console.log("Input received " + input)),
    map(input => {
      const resp = "response " + input
      return resp
    })
  )
}

上面的代码显然有重复,因此可以像这样编码得更优雅一点(但可能不太清楚)

function restServiceOnSteroids(input: number) {
  return restService(input).pipe(
    delay(300),
    map(resp => {
      console.log("do stuff with resp " + resp)
    }),
    map(() => {
      console.log("prepare next input " + counter++)
      return counter
    })
  )
}

restServiceOnSteroids(counter).pipe(
  expand(input => restServiceOnSteroids(input)),
  take(7)
).subscribe()

这里有一个 stackblitz 重现了这个解决方案

© www.soinside.com 2019 - 2024. All rights reserved.