特别处理Akka流的第一个元素

问题描述 投票:8回答:4

是否有一种以特殊方式处理Akka stream的Source第一元素的惯用方法?我现在拥有的是:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

谢谢

scala akka akka-stream
4个回答
8
投票

虽然我一般会使用Ramon的答案,你也可以使用前缀为1的prefixAndTailflatMapConcat来实现类似的东西:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5

这当然不仅适用于第一个项目,而且适用于前N个项目,条件是这些项目将被视为严格的集合。


3
投票

使用zipWith

你可以用最初只返回Source的布尔人来源拉链原来的true。然后可以处理此压缩源。

首先,我们需要一个发出布尔值的源:

//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] = 
  (Iterator single true) ++ (Iterator continually false)

def firstTrueSource : Source[Boolean, _] = 
  Source fromIterator firstTrueIterator

然后我们可以定义一个处理两种不同情况的函数:

type Data = ???
type OutputData = ???

def processData(data : Data, firstRun : Boolean) : OutputData = 
  if(firstRun) { ... }
  else { ... }

然后可以在原始来源的zipWith中使用此函数:

val originalSource : Source[Data,_] = ???    

val contingentSource : Source[OutputData,_] =
  originalSource.zipWith(firstTrueSource)(processData)

使用有状态流

您可以创建一个Flow,其中包含与问题中的示例类似的状态,但具有更多功能方法:

def firstRunner(firstCall : (Data) => OutputData,
                otherCalls : (Data) => OutputData) : (Data) => OutputData = {
  var firstRun = true
  (data : Data) => {
    if(firstRun) {
      firstRun = false
      firstCall(data)
    }
    else
      otherCalls(data)
  }
}//end def firstRunner

def firstRunFlow(firstCall :  (Data) => OutputData, 
                 otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = 
  Flow[Data] map firstRunner(firstCall, otherCalls)

然后可以将此Flow应用于您的原始来源:

def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???

val firstSource : Source[OutputData, _] = 
  originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)

“惯用语”

直接回答你的问题需要规定“习惯的方式”。我最后回答那个部分,因为它是编译器最不可验证的,因此更接近意见。我永远不会声称自己是惯用代码的有效分类器。

我个人对akka-streams的体验是,最好将我的视角转换为想象一个Data元素的实际流(我想到的是带有厢式车的火车)。我是否需要将其分解为多个固定尺寸的列车?只有某些厢式车可以通过吗?我可以并排安装另一辆包含Boolean汽车的火车,它可以发出信号吗?由于我对流(火车)的关注,我更喜欢zipWith方法。我最初的方法是始终使用连接在一起的其他流部分。

此外,我发现最好尽可能少地嵌入akka Stream组件中的代码。 firstTrueIteratorprocessData根本不依赖于akka。同时,firstTrueSourcecontingentSource定义几乎没有逻辑。这允许您独立于笨重的ActorSystem测试逻辑,并且可以在Futures或Actors中使用guts。


0
投票

您可以使用prepend将源添加到流中。只需将单项源添加到流中,在流量耗尽后,原始源的其余部分将继续。

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html

 Source(List(1, 2, 3))
  .prepend(Source.single(0))
  .runWith(Sink.foreach(println))

0 1 2 3


0
投票

虽然我更喜欢zip方法,但也可以使用statefulMapConcat

source
  .statefulMapConcat { _ =>
        var firstRun = true
        elem => {
          if (firstRun) {
            //first
            firstRun = false
          } else {
            //not first            
          }
        }
      }
© www.soinside.com 2019 - 2024. All rights reserved.