我正在使用akka-grpc来生成客户端绑定。它们通常具有形式
func[A, B](in: Source[A]) : Source[B]
,
即他们消耗Source[A]
并提供Source[B]
。
现在,我想将func
变成Flow[A, B]
,将它们与akka-stream一起使用。
解决方案是:
def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }
它使用qazxsw poi劫持潜在的qazxsw poi。