documentation的官方MergeLatest
声明:
[MergeLatest为从某个输入流发出的每个元素发出列表,但仅在每个输入流发出至少一个元素之后。
我的问题是:可以绕开它吗?例如,我们是否可以提供一个默认值,使其从任何输入流中至少接收到一个元素后立即开始生成列表?
以下应该是新行为:
(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)
而不是:
(2,1,1)
(2,1,2)
因为我也需要将这些第一个列表也推送到输出流中
对于每个传入流,您可以使用Source.single(0).concat
发出零,然后发出其余流:
def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
Source.single(a).concat(source)
[很遗憾,mergeLatest
没有提供这种选项。而且似乎没有任何Stream运算符可以轻松地做到这一点。一种方法是将MergeLatest
用于特定需求。好消息是,由于相关代码实现是MergeLatest
的标准GraphStage
,因此必要的代码更改非常简单。
UniformFanInShape
在这种情况下,只需要很少的代码更改。除了要在数组
import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape } import scala.collection.immutable object MergeLatestWithDefault { def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] = new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList) } final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M) extends GraphStage[UniformFanInShape[T, M]] { require(inputPorts >= 1, "input ports must be >= 1") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i)) val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out") override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]() private var runningUpstreams: Int = inputPorts private def upstreamsClosed: Boolean = runningUpstreams == 0 private val messages: Array[Any] = Array.fill[Any](inputPorts)(default) override def preStart(): Unit = in.foreach(tryPull) in.zipWithIndex.foreach { case (input, index) => setHandler( input, new InHandler { override def onPush(): Unit = { messages.update(index, grab(input)) activeStreams.add(index) emit(out, buildElem(messages.asInstanceOf[Array[T]])) tryPull(input) } override def onUpstreamFinish(): Unit = { if (!eagerClose) { runningUpstreams -= 1 if (upstreamsClosed) completeStage() } else completeStage() } }) } override def onPull(): Unit = { var i = 0 while (i < inputPorts) { if (!hasBeenPulled(in(i))) tryPull(in(i)) i += 1 } } setHandler(out, this) } override def toString = "MergeLatestWithDefault" }
中填充default
的附加参数外,唯一的变化是messages
中的emit
不再是有条件的。
正在测试:
onPush
作为奖励,虽然
import akka.actor.ActorSystem object CustomMerge { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("system") val s1 = Source(1 to 3) val s2 = Source(11 to 13).throttle(1, 50.millis) val s3 = Source(101 to 103).throttle(1, 100.millis) Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println) } } // Output: // // List(1, 0, 0) // List(1, 11, 0) // List(1, 11, 101) // List(2, 11, 101) // List(2, 12, 101) // List(3, 12, 101) // List(3, 13, 101) // List(3, 13, 102) // List(3, 13, 103)
仅在Akka StreammergeLatest
+上可用,但根据我的简短测试,此重新设计的代码在2.6
上似乎可以正常使用。