Akka:创建图形阶段而不进行扇动

问题描述 投票:0回答:1

我正在使用GraphStage构建稍微复杂的流,并希望从一个简单的线性图开始逐步进行操作,该线性图不包含任何广播,分区或合并。我得到了使用分区和合并的图表,这些图表是Merge上的出口。但是如何创建自己的插座?

此作品

val filterSwitch = 0

Source.fromGraph(GraphDSL.create() {
    implicit builder =>
        import GraphDSL.Implicits._

   val source = Source(List('1', '2', '3'))

   val fileFlow = Flow[String] ...

   val merge = builder.add(Merge[ByteString](2))

   val partition = builder.add(Partition[ByteString]
        (2, in => if (filterSwitch) 0 else 1))

   source ~> fileFlow ~> partition.in
   partition.out(0) ~> filterFlow ~> merge.in(0)
   partition.out(1) ~> merge.in(1)
   SourceShape(merge.out)
  }
)

我如何创建一个看起来像]的图>

source ~> fileFlow ~> SourceShape(fileFlow.out)

这显然是不正确的,但是我如何从fileFlow创建一个可以转换为SourceShape的出口?

我正在使用GraphStage构建稍微复杂的流,并希望从一个简单的线性图开始逐步进行操作,该线性图不包含任何广播,分区或合并。我...

scala akka akka-stream
1个回答
0
投票

不清楚是要带外部SourceShapeShape还是Source。无论哪种情况,您都可以简单地将Source的String元素映射到ByteString中,如下面的示例(从示例代码略作修改)所示。

© www.soinside.com 2019 - 2024. All rights reserved.