获取重载方法〜>使用替代方案Akka广播

问题描述 投票:0回答:1

我正在尝试将传入的 Source[ByteString, Any] 广播到 2 个不同的流,然后扇入(zip)输出。但是我收到错误“重载方法〜>使用替代方案”。

val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)

  val incrementer = Flow[String].map{ x =>
    x
  }
  val multiplier = Flow[String].map{ x =>
    x
  }

  val output = Sink.foreach[(Type1, Type2)] { n1 =>
    println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
  }

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[String](2))
      val zip = builder.add(Zip[Type1, Type2])//fan-in operator

      byteStringSource ~> broadcast
      broadcast.out(0) ~> incrementer ~> zip.in0
      broadcast.out(1) ~> multiplier ~> zip.in1

      zip.out ~> output
      ClosedShape
    }
  )
  graph.run()

我该如何解决这个问题?

scala akka akka-stream
1个回答
0
投票

您的源的类型为

ByteString
,但广播元素的类型为
String
,因此
~>
运算符不适用。

对于您的简单示例,您可以删除 ByteString 并仅使用纯字符串。如果您的实际情况使用更复杂的类型,您可以将原始源映射到广播将接受的类型,所以类似

val theSource = byteStringSource.map(byteString => ...)

然后在你的 GraphDSL 中使用

theSource

© www.soinside.com 2019 - 2024. All rights reserved.