合并具有相同物化值的多个源

问题描述 投票:1回答:1

akka流中的combine运算符具有以下签名:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]

我有多个来源,都具有相同的Mat。我需要将它们合并起来保存Mat

因此,我需要一个具有以下签名的功能:

  def combine[T, U](first: Source[T, Mat], second: Source[T, Mat], rest: Source[T, Mat]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, Seq[Mat]]

现有的combineMat仅接受两个输入。我需要无限。

Akka的Combine的实现是:

  def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(
      strategy: Int => Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed] =
    Source.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val c = b.add(strategy(rest.size + 2))
      first ~> c.in(0)
      second ~> c.in(1)

      @tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
        if (i.hasNext) {
          i.next() ~> c.in(idx)
          combineRest(idx + 1, i)
        } else SourceShape(c.out)

      combineRest(2, rest.iterator)
    })

它使用不支持SourceShapeMat,所以我认为在这里不起作用。

与此同时,combineMat的实现使用viaMat,这不适用于多个流。

这可能吗?

scala akka-stream
1个回答
0
投票

以下作品:

import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{GraphDSL, Source}
import akka.stream.{Graph, SourceShape, UniformFanInShape}

import scala.collection.immutable

object Combine {
  def combine[T, U, Mat](sources: immutable.Seq[Source[T, Mat]])(strategy: Int => Graph[UniformFanInShape[T, U], Mat]): Source[U, immutable.Seq[Mat]] = {
    Source.fromGraph(GraphDSL.create(sources) {
      implicit builder => {
        sourceShapes => {
          val target = builder.add(strategy(sources.size))

          for ((source, index) <- sourceShapes.zipWithIndex) {
            source ~> target.in(index)
          }

          SourceShape(target.out)
        }
      }
    })
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.