我有多种类型的事件流。在新订阅中,我想为每种类型(唯一一种)重复最新事件,然后继续任何新事件。
假设我有一个事件:DoorStateChange(door_id, new_state), LockStateChange(lock_id, new_state), Booted(boot_time).
从每个事件中,我可以创建一个用于重复数据删除的字符串,
DoorStateChange/${door_id}
,等等
所以如果我收到事件:
Booted(129048124093)
DoorStateChange(door1, Opened)
DoorStateChange(door2, Closed)
DoorStateChange(door1, Closed)
LockStateChange(door1, Locked)
LockStateChange(door2, Locked)
LockStateChange(door1, Opened)
如果我有一个新订阅者,我希望它立即收到以下内容,然后是所有其他事件:
Booted(129048124093)
DoorStateChange(door2, Closed)
DoorStateChange(door1, Closed)
LockStateChange(door2, Locked)
LockStateChange(door1, Opened)
如何实现?我需要 shareReplay,带有某种过滤器,所以旧值被覆盖......或者更好的是,指定每种类型/标识符重播的消息数。
• Webpack 和 rxjs - “你可能需要一个合适的加载器”错误
• 如果ReplaySubject在RXJS中没有发出任何值,则startWith操作符。
• 如何用 rxjs 一个接一个地进行 2 个异步调用,而不管第一个是成功还是失败
• 使用 Jest 和 TestScheduler 测试无限可观察量
• 为什么不是所有流都在 combineLatest 中发出值?
• 防止一个主题的多个订阅者触发 RxJS 中每个订阅者的部分主题管道
• 防止来自 Pipe(takeUntil()) 的多个 API 调用
• How to map an http .get observable response to handle object of arrays or array of objects