保持。卡夫卡流的例子

问题描述 投票:0回答:1

我想了解akka流中的Keep.both但我在互联网上找不到容易的东西。

有人可以提供一个关于Keep.rightKeep.both的非常简单的例子。

我试过了:

   implicit val system = ActorSystem("KafkaProducer")
    implicit val materializer = ActorMaterializer()

    val source = Source.single("Hello")_
    val sink = Sink.fold[String, String]("")(_ + _)

    val runnable: RunnableGraph[Future[String]] = source.toMat(sink)(Keep.left)
    runnable.run() 

我知道,这可能不是一个好例子,希望有人提供一个更好的例子。

scala akka-stream
1个回答
0
投票

最简单的情况是,您需要一个流来处理您要(1)从流外部提供的一系列元素,并且您需要知道(2)流何时完成处理所有元素。

对于(1)你可以使用物化到队列中的Source.queue,你可以通过offer将元素推送到它。

val source = Source.queue[String](100,OverflowStrategy.backpressure)

对于(2)你可以使用一个Sink.foreach,该Future[Done]将在达到流的正常末端时用Success完成,或者如果在流中发出故障信号,则用Failure完成。

val sink = Sink.foreach[String](println)

然后,您需要连接源和接收器以及Keep.both物化值。

val materializedValues: (SourceQueueWithComplete[String], Future[Done]) = source.toMat(sink)(Keep.both).run()
© www.soinside.com 2019 - 2024. All rights reserved.