当我的史诗在ofType
点回应时,我需要等到state$.value.foo
变成true
。一旦它是真的,那么我希望它到达from
,它做了一个获取和重要的东西。我是这样做的:
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
iif(
() => state$.value.foo === true,
EMPTY,
action$.pipe(
filter(() => state$.value.foo === true),
)
),
from(fetch(...)).pipe(
// ... do important stuff here BUT only after state$.value.foo has become true
)
)
)
发生的事情是我发出了超大量的动作,它永远不会到达from(fetch))
。
首先,请注意qxxswpoi运算符在RxJS 6中被弃用(这将是从concat
导入的运算符)。但是,用于创建可观察对象的函数rxjs/operators
不会被弃用(这将是从concat
导入的函数)。我建议使用不被弃用的运算符。
其次,您当前的方法存在一些问题。
rxjs
上面的过滤操作匹配类型“START_CONTINUE_SESSION”,并允许它们传递回Redux存储。这是因为action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
...
运算符允许源事件通过并等待前一个observable在开始下一个observable之前完成。但是,因为redux-observable动作流永远不会完成,所以concat
永远不会开始下一个observable!从旧的RxJS文档中查看以下大理石图:
concat
如图所示,源事件通过。使用redux-observable,这意味着您的“START CONTINUE SESSION”操作将陷入永无止境的重复循环。
concat
您首先要检查商店中 ...
iif(
() => state$.value.foo === true,
EMPTY, // I've assumed that this is equivalent to `empty()`
action$.pipe(
filter(() => state$.value.foo === true),
),
),
...
的当前值。如果它的值是foo
,则不会发出更多信息(仅在此特定步骤中),接下来将开始true
。如果其值为from
,则会创建新的操作流订阅。对于调度的每个未来操作,这将检查商店中false
的当前值。当它的值最终成为foo
时,流入的动作(可能是任何动作!)被允许传回Redux商店。但请注意,此订阅永远不会结束!只要true
仍然是foo
,你将再次拥有一个永无止境的行动循环并直接返回Redux商店。
你应该订阅true
而不是订阅action$
并检查状态。以下示例略有不同,但我认为它显示了实现目标的方法。这等待初始动作(state$
),然后等待状态有START_CONTINUE_SESSION
,然后将动作和状态发送到foo === true
,你可以像正常一样处理它(获取,发送其他动作等)。如果您不需要状态副本,那么可以忽略它。
mergeMap
至于回应最初的行动(export const epic = (action$, state$) =>
action$.pipe(
ofType(START_CONTINUE_SESSION),
withLatestFrom(state$),
exhaustMap(([action, state]) =>
state.foo === true
? of([action, state])
: state$.pipe(
mergeMap(state =>
state.foo === true
? of([action, state])
: empty()
),
first(),
)
),
mergeMap(([action, state]) =>
// here we have triggering action and state with foo === true
from(fetch(...)).pipe(
// ... do important stuff here
)
),
)
),我在上面的例子中选择了START_CONTINUE_SESSION
。可能的替代品包括exhaustMap
,concatMap
和mergeMap
。您应该选择最适合您的用例的运算符:
switchMap
- 监听所有操作并按顺序运行多个工作流程。concatMap
- 听取第一个动作并等到完成工作流程后再接受另一个动作。exhaustMap
- 监听所有操作并并行运行多个工作流程。mergeMap
- 听取所有行动,但一次只运行一个。在接收新操作时取消任何先前的工作流程。