我有一个用例,其中我有一个提供服务“地址”的源。然后我想将该地址映射到从该服务传输数据的源。现在,如果我得到一个新地址,我想为该地址创建一个新源,并继续从第二个源进行流式传输,同时停止前一个源(因为无法保证最后一个地址会发生什么) /服务/来源)。
到目前为止,我发现
flatMapConcat
是最接近我需要的,但我想停止以前的来源并保留最新的来源。
在某种程度上我希望:
AddressSource
.flatMatLatest(address => StreamingSource.from(address))
// at this point we should be receiving elements produced by the latest StreamingSource
.to(Sink...)
诚然,这只是在心里编译的,但这样的东西应该有效:
sourceOfAddresses
.statefulMap(() => Option.empty[KillSwitch])(
{ (lastEmittedKillswitch, address) =>
lastEmittedKillswitch.foreach(_.shutdown)
// this just happens to match the desired ordering of the tuple, woohoo!
buildSourceForAddress(address)
.viaMat(Killswitches.single)(Keep.right)
.preMaterialize() // will need the materializer in implicit scope for this to work
},
_ => None
)
.detach // in combination with the shutdown above, ensure perpetual demand
.flatMapConcat(identity)
基本上:
flatMapConcat
的每个源,并预先实现该源,以便您可以访问终止开关flatMapConcat
killswitch 将完成发射的源,导致
flatMapConcat
需要另一个源,该源在 detach
(基本上是 1 的缓冲区)中随时可用(因为它在触发killswitch 后立即发射)。由于 detach
现在是空的,因此需求通过 statefulMap
到 sourceOfAddresses
发出信号。