我正在尝试寻找一种方法,在先前的可观察值完成之前,如何在concatMap输入上缓冲数据。
最好在我的真实示例中进行解释。
我有一个使用redux-observable的应用程序。
有一个动作validate
const validate = updatedDataIds => ({
type: VALIDATE,
payload: { updatedDataIds },
});
我想缓冲updatedDataIds数组并减少对longTimePostAjaxFunction
的调用。
export const validateEpic = (action$, state$, { longTimePostAjaxFunction }) =>
action$.pipe(
ofType(VALIDATE),
map(({ payload: { updatedDataIds } }) => updatedDatapointIds),
// I want to buffer longTimePostAjaxFunction here until previous observable (longTimePostAjaxFunction) completes
map(flatten),
concatMap(updatedDataIds =>
longTimePostAjaxFunction('validate/url', {
updatedDataIds,
}).pipe(
map(validateFulfilled),
),
),
);
您知道如何解决此问题吗?
解决问题的一种方法是每次longTimePostAjaxFunction
解析时发出一个流。例如(尽管可能有更好的选择),您可以将id流缓冲在action$.pipe(ofType('READY_FOR_IDS')
上,并将ready操作压平为AJAX结果流。
export const validateEpic = (action$, state$, { longTimePostAjaxFunction }) =>
action$.pipe(
ofType(VALIDATE),
map(({ payload: { updatedDataIds } }) => updatedDatapointIds),
buffer(action$.pipe(ofType('READY_FOR_IDS')),
map(flatten),
concatMap(updatedDataIds =>
longTimePostAjaxFunction('validate/url', {
updatedDataIds,
}).pipe(
map(validateFulfilled)
),
),
concatMap(action => [action, { type: 'READY_FOR_IDS' }])
);
我当前(糟糕的)解决方案是:
缓冲区:
..
map(({ payload: { updatedDataIds } }) => updatedDataIds),
buffer(action$.pipe(ofType(VALIDATION_START))),
map(flatten),
..
缓冲逻辑:
export const pendingValidate = action$ =>
mergeMap(() =>
action$.pipe(
ofType(VALIDATE, VALIDATE_FULFILLED),
map(({ type }) => type),
scan(
([gate], currentAction) => {
if (currentAction === VALIDATE) {
if (gate === 'open') return ['closed', true];
return ['waiting', false];
}
if (gate === 'waiting' && currentAction === VALIDATE_FULFILLED) {
return ['closed', true];
}
return ['open', false];
},
['open', false],
),
filter(([, shouldValidate]) => shouldValidate),
map(() => ({ type: VALIDATION_START })),
),
),
);