Akka流-MergeLatest的默认值

问题描述 投票:0回答:2

documentation的官方MergeLatest声明:

[MergeLatest为从某个输入流发出的每个元素发出列表,但仅在每个输入流发出至少一个元素之后。

我的问题是:可以绕开它吗?例如,我们是否可以提供一个默认值,使其从任何输入流中至少接收到一个元素后立即开始生成列表?

以下应该是新行为:

(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)

而不是:

(2,1,1)
(2,1,2)

因为我也需要将这些第一个列表也推送到输出流中

scala akka akka-stream
2个回答
0
投票

对于每个传入流,您可以使用Source.single(0).concat发出零,然后发出其余流:

def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
  Source.single(a).concat(source)

0
投票

[很遗憾,mergeLatest没有提供这种选项。而且似乎没有任何Stream运算符可以轻松地做到这一点。一种方法是将MergeLatest用于特定需求。好消息是,由于相关代码实现是MergeLatest的标准GraphStage,因此必要的代码更改非常简单。

UniformFanInShape

在这种情况下,只需要很少的代码更改。除了要在数组import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape } import scala.collection.immutable object MergeLatestWithDefault { def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] = new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList) } final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M) extends GraphStage[UniformFanInShape[T, M]] { require(inputPorts >= 1, "input ports must be >= 1") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i)) val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out") override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]() private var runningUpstreams: Int = inputPorts private def upstreamsClosed: Boolean = runningUpstreams == 0 private val messages: Array[Any] = Array.fill[Any](inputPorts)(default) override def preStart(): Unit = in.foreach(tryPull) in.zipWithIndex.foreach { case (input, index) => setHandler( input, new InHandler { override def onPush(): Unit = { messages.update(index, grab(input)) activeStreams.add(index) emit(out, buildElem(messages.asInstanceOf[Array[T]])) tryPull(input) } override def onUpstreamFinish(): Unit = { if (!eagerClose) { runningUpstreams -= 1 if (upstreamsClosed) completeStage() } else completeStage() } }) } override def onPull(): Unit = { var i = 0 while (i < inputPorts) { if (!hasBeenPulled(in(i))) tryPull(in(i)) i += 1 } } setHandler(out, this) } override def toString = "MergeLatestWithDefault" } 中填充default的附加参数外,唯一的变化是messages中的emit不再是有条件的。

正在测试:

onPush

作为奖励,虽然import akka.actor.ActorSystem object CustomMerge { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("system") val s1 = Source(1 to 3) val s2 = Source(11 to 13).throttle(1, 50.millis) val s3 = Source(101 to 103).throttle(1, 100.millis) Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println) } } // Output: // // List(1, 0, 0) // List(1, 11, 0) // List(1, 11, 101) // List(2, 11, 101) // List(2, 12, 101) // List(3, 12, 101) // List(3, 13, 101) // List(3, 13, 102) // List(3, 13, 103) 仅在Akka StreammergeLatest+上可用,但根据我的简短测试,此重新设计的代码在2.6上似乎可以正常使用。

© www.soinside.com 2019 - 2024. All rights reserved.