如何使用flatMapConcat?

问题描述 投票:0回答:1

我正在尝试使用flatMapConcat如下:

Source.empty
      .flatMapConcat {
        Source.fromFuture(Future("hello"))
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }

并且编译器抱怨:

Error:(31, 26) type mismatch;
 found   : akka.stream.scaladsl.Source[String,akka.NotUsed]
 required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
        Source.fromFuture(Future("hello")) 

我究竟做错了什么?

scala akka-stream
1个回答
2
投票

方法flatMapConcat具有以下签名:

def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]

在处理SourceString的情况下,它会期望像下面这样的函数:

f: String => Source(Iterable[String])

您的示例代码的另一个问题是Source.empty[T]没有要处理的元素,因此后续的flatMapConcat永远不会被执行。

这是一个使用flatMapConcatSource名称转换每个元素的示例:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

Source(List("alice", "bob", "jenn")).
  flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
  runWith(Sink.foreach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi jenn
// Bye jenn

作为旁注,可以用flatMapConcat替换上例中的mapConcatSource(List("alice", "bob", "jenn")). mapConcat{ name => List(s"Hi $name", s"Bye $name") }. runWith(Sink.foreach(println)) 需要更简单的函数签名:

qazxswpoi
© www.soinside.com 2019 - 2024. All rights reserved.