我有一个名为“poll”的函数,它使我能够轮询 API 端点。
在某些时候,我希望通过手动操作来停止轮询,这里名为“manualForceStop”,它是一个以“false”开头初始化的BehaviorSubject。在某些时候(例如,当用户触发某个操作时),我会将 manualForceStop 传递为 true。此时,我想取消订阅并抛出特定错误,例如“throwError(() => 'MANUAL_FORCE_STOP'”。
我不知道该怎么办,因为重试总是......重试。有什么想法吗?
const CONVERT_SECONDS_TO_MS = 1000;
interface PollCmd<TApiArgs, TApiResponse> {
intervalInSeconds: number;
timeoutInSeconds: number;
apiCall: (arg?: TApiArgs) => Observable<TApiResponse>;
shouldContinuePolling: (arg: TApiResponse) => boolean;
manualForceStop: BehaviorSubject<boolean>;
}
export function poll<TApiArgs, TApiResponse>(
input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
return input.apiCall().pipe(
switchMap((apiResponse) => {
if (input.shouldContinuePolling(apiResponse)) {
return throwError(() => 'CONDITION_NOT_REACHED');
}
return of(apiResponse);
}),
retry({ delay: input.intervalInSeconds * CONVERT_SECONDS_TO_MS }),
timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
/*
At some point, manualForceStop passes from "false" to "true" and then,
I would like to unsubscribe and throw a specific error
*/
);
查看
retry
的 RetryConfig#delay
属性,我注意到您可以传递函数而不是数字。
const CONDITION_NOT_REACHED = 'CONDITION_NOT_REACHED';
export function poll<TApiArgs, TApiResponse>(
input: PollCmd<TApiArgs, TApiResponse>
): Observable<TApiResponse> {
return input.apiCall().pipe(
switchMap((apiResponse) => {
if (input.shouldContinuePolling(apiResponse)) {
return throwError(() => CONDITION_NOT_REACHED);
}
return of(apiResponse);
}),
retry({ delay: (err, retryCount) => {
if (err === CONDITION_NOT_REACHED) {
return throwError(() => CONDITION_NOT_REACHED);
}
return timer(input.intervalInSeconds * CONVERT_SECONDS_TO_MS).pipe(take(1));
} }),
timeout({ first: input.timeoutInSeconds * CONVERT_SECONDS_TO_MS })
/*
At some point, manualForceStop passes from "false" to "true" and then,
I would like to unsubscribe and throw a specific error
*/
);
}
从传递给
delay
的函数返回的 observable 具有以下属性: