我需要使用 rxjs 实现 HTTP 长轮询(在 Angular 环境中)。挑战在于我需要在每次调用时更改请求参数。更具体地说:我需要更改 from/to 值才能使其滚动。例如,始终从当前时间倒退 1 分钟。
现在我有以下实现:
const requestParameter$ = new BehaviorSubject<MetricAggregationParams>(initialRequestParameter);
this.metricsService
.searchAggregatedMetrics(requestParameter$)
.pipe(
tap((metricInstancesResult) => {
// do something with the result
}),
delay(3000),
tap(() => {
requestParameter$.next({
...requestParameter$.value,
from: DateTime.now()
.minus({ minutes: timerange.timerangeInMinutes as number })
.toISO(),
to: DateTime.now().toISO()
});
})
)
.subscribe();
})
searchAggregatedMetrics(requestParameter$: BehaviorSubject<MetricAggregationParamsDto>) {
return requestParameter$.pipe(
concatMap((requestParameter) =>
this.http.post<MetricAggregationResult>(`REST_URL`, requestParameter)
)
);
}
以下是一些限制:
有没有办法将长轮询逻辑全部放在 searchAggregateMetrics 方法中?
如果我对问题的理解正确,那么你面临着某种递归问题。 rxJs 中的递归是通过
expand
运算符解决的。
您的问题的一个解决方案可能是这样的
restService(0).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
}),
// expand is an operator that takes as input a function that
// returns an Observable
expand(input => {
return restService(counter).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
}),
)
}),
// we use take just to finish the iteration after a certain number of calls
take(10)
).subscribe()
// counter used to simulate the fact that we change input at every call
let counter = 0
// function to simulate a rest service call
function restService(input: number) {
return of(input).pipe(
tap(input => console.log("Input received " + input)),
map(input => {
const resp = "response " + input
return resp
})
)
}
上面的代码显然有重复,因此可以像这样编码得更优雅一点(但可能不太清楚)
function restServiceOnSteroids(input: number) {
return restService(input).pipe(
delay(300),
map(resp => {
console.log("do stuff with resp " + resp)
}),
map(() => {
console.log("prepare next input " + counter++)
return counter
})
)
}
restServiceOnSteroids(counter).pipe(
expand(input => restServiceOnSteroids(input)),
take(7)
).subscribe()
这里有一个 stackblitz 重现了这个解决方案