我想了解akka流中的Keep.both
但我在互联网上找不到容易的东西。
有人可以提供一个关于Keep.right
和Keep.both
的非常简单的例子。
我试过了:
implicit val system = ActorSystem("KafkaProducer")
implicit val materializer = ActorMaterializer()
val source = Source.single("Hello")_
val sink = Sink.fold[String, String]("")(_ + _)
val runnable: RunnableGraph[Future[String]] = source.toMat(sink)(Keep.left)
runnable.run()
我知道,这可能不是一个好例子,希望有人提供一个更好的例子。
最简单的情况是,您需要一个流来处理您要(1)从流外部提供的一系列元素,并且您需要知道(2)流何时完成处理所有元素。
对于(1)你可以使用物化到队列中的Source.queue
,你可以通过offer将元素推送到它。
val source = Source.queue[String](100,OverflowStrategy.backpressure)
对于(2)你可以使用一个Sink.foreach
,该Future[Done]
将在达到流的正常末端时用Success
完成,或者如果在流中发出故障信号,则用Failure
完成。
val sink = Sink.foreach[String](println)
然后,您需要连接源和接收器以及Keep.both物化值。
val materializedValues: (SourceQueueWithComplete[String], Future[Done]) = source.toMat(sink)(Keep.both).run()