假设存在这样定义的Akka流:
def tee = {
var writer: Writer = ???
Flow.fromFunction[String, String] { msg =>
writer.write(msg)
msg
}
}
上游完成后,需要冲洗关闭写入器。有没有一种方法可以不诉诸GraphStageLogic
等,如此处https://doc.akka.io/docs/akka/current/stream/stream-customize.html所述?
如果不将Flow转换为Sink,则无法使用Flow进行此操作。
如果选择接收器,请执行以下操作
def tee = {
val writer: Writer = new StringWriter()
Sink
.foreach[String] { msg =>
writer.write(msg)
}
.mapMaterializedValue(_.map { done =>
writer.close()
done
})
}
可以通过akka.stream.scaladsl.StreamConverters
进行以下类似操作
val sink: Sink[String, Future[IOResult]] = {
StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
}