Source.Single()会提前终止具有2个源的流

问题描述 投票:0回答:1

我已经定义了一个简单的图形,该图形将恒定流(通过Source.Single(5000)定义)与非恒定源(例如Source(1 to 100))组合在一起。我的图形通过ZipLatestWith运算符将两个数字加在一起。目的是使流的输出为5001, 5002, 5003, 5004, ..., 5100。实际上,程序的输出只是5000,然后流可能会终止,因为Single源已完成。

[如何获得期望的结果,将具有5000恒定值的源与非恒定源的每个值组合在一起?注意,由于概念上的原因(与该特定示例无关),恒定源仍然是流的源非常重要。完整的代码示例如下,仅将数字5000打印到控制台。

Main.scala:

import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior, Terminated}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, ZipLatestWith}
import akka.stream.{FlowShape, Materializer}

object Main {
  def apply(): Behavior[NotUsed] = {
    Behaviors.setup { context =>

      val constantStream: Source[Int, NotUsed] = Source.single(5000)
      val graph: Flow[Int, Int, NotUsed] = Flow.fromGraph(GraphDSL.create(){
        implicit builder =>
          import GraphDSL.Implicits._

          val zipper = builder.add(ZipLatestWith[Int, Int, Int]((a: Int, b: Int) => a * b))
          constantStream ~> zipper.in1

          FlowShape(zipper.in0, zipper.out)
      })

      Source(1 to 100)
        .via(graph)
        .to(Sink.foreach(println)).run()(Materializer.apply(context))

      Behaviors.receiveSignal {
        case (_, Terminated(_)) =>
          Behaviors.stopped
      }
    }
  }

  def main(args: Array[String]): Unit = {
    ActorSystem(Main(), "test")
  }
}

build.sbt:

scalaVersion := "2.12.6"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % "2.6.0"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
scala akka akka-stream
1个回答
0
投票

仅使用Source.repeat(5000)

© www.soinside.com 2019 - 2024. All rights reserved.