如何对我的流程中的上游完成反应?

问题描述 投票:0回答:1

假设存在这样定义的Akka流:

def tee = {
  var writer: Writer = ???

  Flow.fromFunction[String, String] { msg =>
    writer.write(msg)
    msg
  }
}

上游完成后,需要冲洗关闭写入器。有没有一种方法可以不诉诸GraphStageLogic等,如此处https://doc.akka.io/docs/akka/current/stream/stream-customize.html所述?

scala akka akka-stream
1个回答
0
投票

如果不将Flow转换为Sink,则无法使用Flow进行此操作。

如果选择接收器,请执行以下操作

  def tee = {
    val writer: Writer = new StringWriter()

    Sink
      .foreach[String] { msg =>
        writer.write(msg)
      }
      .mapMaterializedValue(_.map { done =>
        writer.close()
        done
      })
  }

可以通过akka.stream.scaladsl.StreamConverters进行以下类似操作

    val sink: Sink[String, Future[IOResult]] = {
      StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
    }
© www.soinside.com 2019 - 2024. All rights reserved.