等待状态改变导致重复动作发出

问题描述 投票:1回答:1

当我的史诗在ofType点回应时,我需要等到state$.value.foo变成true。一旦它是真的,那么我希望它到达from,它做了一个获取和重要的东西。我是这样做的:

action$.pipe(
    ofType(START_CONTINUE_SESSION),
    concat(
        iif(
            () => state$.value.foo === true,
            EMPTY,
            action$.pipe(
                filter(() => state$.value.foo === true),
            )
        ),
        from(fetch(...)).pipe(
            // ... do important stuff here BUT only after state$.value.foo has become true
        )
    )
)

发生的事情是我发出了超大量的动作,它永远不会到达from(fetch))

rxjs redux-observable
1个回答
1
投票

首先,请注意qxxswpoi运算符在RxJS 6中被弃用(这将是从concat导入的运算符)。但是,用于创建可观察对象的函数rxjs/operators不会被弃用(这将是从concat导入的函数)。我建议使用不被弃用的运算符。

其次,您当前的方法存在一些问题。

rxjs

上面的过滤操作匹配类型“START_CONTINUE_SESSION”,并允许它们传递回Redux存储。这是因为action$.pipe( ofType(START_CONTINUE_SESSION), concat( ... 运算符允许源事件通过并等待前一个observable在开始下一个observable之前完成。但是,因为redux-observable动作流永远不会完成,所以concat永远不会开始下一个observable!从旧的RxJS文档中查看以下大理石图:

concat

如图所示,源事件通过。使用redux-observable,这意味着您的“START CONTINUE SESSION”操作将陷入永无止境的重复循环。

即使行动流结束并且marble diagram for concat operator开始下一个可观测量,还有其他问题:

concat

您首先要检查商店中 ... iif( () => state$.value.foo === true, EMPTY, // I've assumed that this is equivalent to `empty()` action$.pipe( filter(() => state$.value.foo === true), ), ), ... 的当前值。如果它的值是foo,则不会发出更多信息(仅在此特定步骤中),接下来将开始true。如果其值为from,则会创建新的操作流订阅。对于调度的每个未来操作,这将检查商店中false的当前值。当它的值最终成为foo时,流入的动作(可能是任何动作!)被允许传回Redux商店。但请注意,此订阅永远不会结束!只要true仍然是foo,你将再次拥有一个永无止境的行动循环并直接返回Redux商店。


你应该订阅true而不是订阅action$并检查状态。以下示例略有不同,但我认为它显示了实现目标的方法。这等待初始动作(state$),然后等待状态有START_CONTINUE_SESSION,然后将动作和状态发送到foo === true,你可以像正常一样处理它(获取,发送其他动作等)。如果您不需要状态副本,那么可以忽略它。

mergeMap

至于回应最初的行动(export const epic = (action$, state$) => action$.pipe( ofType(START_CONTINUE_SESSION), withLatestFrom(state$), exhaustMap(([action, state]) => state.foo === true ? of([action, state]) : state$.pipe( mergeMap(state => state.foo === true ? of([action, state]) : empty() ), first(), ) ), mergeMap(([action, state]) => // here we have triggering action and state with foo === true from(fetch(...)).pipe( // ... do important stuff here ) ), ) ),我在上面的例子中选择了START_CONTINUE_SESSION。可能的替代品包括exhaustMapconcatMapmergeMap。您应该选择最适合您的用例的运算符:

  • switchMap - 监听所有操作并按顺序运行多个工作流程。
  • concatMap - 听取第一个动作并等到完成工作流程后再接受另一个动作。
  • exhaustMap - 监听所有操作并并行运行多个工作流程。
  • mergeMap - 听取所有行动,但一次只运行一个。在接收新操作时取消任何先前的工作流程。
© www.soinside.com 2019 - 2024. All rights reserved.