复合流程的目的是什么(来自Sink和Source)?

问题描述 投票:0回答:2

我正在尝试从website理解复合流(来自Sink和Source),它们代表如下:

enter image description here

有人可以提供一个使用复合流程的例子。 什么时候应该使用它?

akka akka-stream
2个回答
1
投票

Flow.fromSinkAndSource提供了一种方便的方法来组装一个由flow作为输入组成的sink和一个source作为其未连接的输出,这可以通过下图(API链接中提供)得到最好的说明:

  +----------------------------------------------+
  | Resulting Flow[I, O, NotUsed]                |
  |                                              |
  |  +---------+                  +-----------+  |
  |  |         |                  |           |  |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
  |  |         |                  |           |  |
  |  +---------+                  +-----------+  |
  +----------------------------------------------+

如@ gabrielgiussi的回答所示,它通常用于需要将现有source(或flow)的输出“切换”到某个不同输出的情况 - 用于测试目的或什么不是。这是一个简单的例子:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )

Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c

值得注意的是,该方法的“Mat”版本fromSinkAndSourceMat有一些有趣的用例。一个例子是使用它来保持half-closed WebSockets打开,使用Source.maybe[T]维护一个Promise[Option[T]]作为物化值,当想要关闭连接时将完成。以下是Akka-http WebSockets client support文档中相关部分的示例代码:

// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
  Flow.fromSinkAndSourceMat(
    Sink.foreach[Message](println),
    Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) =
  Http().singleWebSocketRequest(
    WebSocketRequest("ws://example.com:8080/some/path"),
    flow)

// at some later time we want to disconnect
promise.success(None)

1
投票

也许在某些情况下,您只需要提供Flow,在某些情况下,您需要NoOp Flow。然后你可以做到

Flow.fromSinkAndSource(Sink.ignore,Source.empty)

或者忽略Source中的每个元素并使用另一个元素

Flow.fromSinkAndSource(Sink.ignore,Source.tick(1.second,1.second,"something"))
© www.soinside.com 2019 - 2024. All rights reserved.