我已经定义了一个简单的图形,该图形将恒定流(通过Source.Single(5000)
定义)与非恒定源(例如Source(1 to 100)
)组合在一起。我的图形通过ZipLatestWith
运算符将两个数字加在一起。目的是使流的输出为5001, 5002, 5003, 5004, ..., 5100
。实际上,程序的输出只是5000
,然后流可能会终止,因为Single
源已完成。
[如何获得期望的结果,将具有5000
恒定值的源与非恒定源的每个值组合在一起?注意,由于概念上的原因(与该特定示例无关),恒定源仍然是流的源非常重要。完整的代码示例如下,仅将数字5000
打印到控制台。
Main.scala:
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}
object Main {
def apply(): Behavior[NotUsed] = {
Behaviors.setup { context =>
val constantStream: Source[Int, NotUsed] = Source.single(5000)
val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
implicit builder =>
import GraphDSL.Implicits._
val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
constantStream ~> zipper.in1
FlowShape(zipper.in0, zipper.out)
})
Source(1 to 100)
.via(graph)
.to(Sink.foreach(println)).run()(Materializer.apply(context))
Behaviors.receiveSignal {
case (_, Terminated(_)) =>
Behaviors.stopped
}
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Main(), "test")
}
}
build.sbt:
scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
仅使用Source.repeat(5000)
。