Akka Streams flatMapConcat 在创建新源时停止以前的源

问题描述 投票:0回答:1

我有一个用例,其中我有一个提供服务“地址”的源。然后我想将该地址映射到从该服务传输数据的源。现在,如果我得到一个新地址,我想为该地址创建一个新源,并继续从第二个源进行流式传输,同时停止前一个源(因为无法保证最后一个地址会发生什么) /服务/来源)。

到目前为止,我发现

flatMapConcat
是最接近我需要的,但我想停止以前的来源并保留最新的来源。

在某种程度上我希望:

AddressSource
  .flatMatLatest(address => StreamingSource.from(address))
  // at this point we should be receiving elements produced by the latest StreamingSource
  .to(Sink...)
akka akka-stream
1个回答
0
投票

诚然,这只是在心里编译的,但这样的东西应该有效:

sourceOfAddresses
  .statefulMap(() => Option.empty[KillSwitch])(
    { (lastEmittedKillswitch, address) =>
      lastEmittedKillswitch.foreach(_.shutdown)

      // this just happens to match the desired ordering of the tuple, woohoo!
      buildSourceForAddress(address)
        .viaMat(Killswitches.single)(Keep.right)
        .preMaterialize()  // will need the materializer in implicit scope for this to work
    },
    _ => None
  )
  .detach // in combination with the shutdown above, ensure perpetual demand
  .flatMapConcat(identity)

基本上:

  • 使用终止开关构建要传递给
    flatMapConcat
    的每个源,并预先实现该源,以便您可以访问终止开关
  • 在传递到
    flatMapConcat
  • 之前保存该终止开关
  • 当您获得新地址时,如果有已保存的终止开关,请激活它

killswitch 将完成发射的源,导致

flatMapConcat
需要另一个源,该源在
detach
(基本上是 1 的缓冲区)中随时可用(因为它在触发killswitch 后立即发射)。由于
detach
现在是空的,因此需求通过
statefulMap
sourceOfAddresses
发出信号。

© www.soinside.com 2019 - 2024. All rights reserved.