如何在Akka actor中等待文件上传流完成

问题描述 投票:0回答:3

最近我开始使用Akka,我用它来创建一个使用Akka HTTP上传文件的REST API。该文件可以有数百万条记录,对于每条记录,我需要执行一些验证和业务逻辑。我对我的actor进行建模的方式是,root actor接收文件流,将bytes转换为String,然后按行分隔符拆分记录。执行此操作后,它将流(按记录记录)发送给另一个actor进行处理,然后根据某些分组将记录分发给其他actor。要将蒸汽从主根演员发送到演员进行处理我正在使用Sink.actorRefWithAck

这对于一个小文件工作正常,但对于我观察到的大文件,我得到多个块并且第一个块正在处理。如果我根据负载添加Thread.sleep几秒钟,那么它正在处理整个文件。我想知道是否有任何方式我可以知道流是否已被处理完全消耗掉,以便我不必处理Thread.sleep。这是我使用的代码片段:

val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
  Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)

val actorSink = Sink.actorRefWithAck(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage
)

val processStream =
  fileStream
    .map(byte => byte.utf8String.split(System.lineSeparator()))
    .runWith(actorSink)

Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")

任何有关我所采取的方法的专家建议都将受到高度赞赏。

scala akka akka-stream akka-http
3个回答
0
投票

如果您只有一个文件的Source,则可以通过等待从runWith方法返回的Future来等待流完成。

如果您有多个文件的源,您应该写如下:

filesSource
  .mapAsync(1)(data => (receiver ? data).mapTo[ProcessingResult])
  .mapAsync(1)(processingResult => (resultListener ? processingResult).mapTo[ListenerResponse])
  .runWith(Sink.ignore)

0
投票

当流成功完成或失败时,receiver演员将收到OnCompleteMessageonErrorMessage,所以你应该在接收器receive演员的DefaultFileUploadProcessActor块中处理这些消息。


0
投票

假设fileStreamSource[ByteString, Future[IOResult],一个想法是保留源的物化价值,然后在物化价值完成后触发对sender()的回复:

val processStream: Future[IOResult] =
  fileStream
    .map(_.utf8String.split(System.lineSeparator()))
    .to(actorSink)
    .run()

processStream.onComplete {
  case Success(_) =>
    log.info("completed distribution of data to the actors")
    sender() ! ActionPerformed(uuid, "Done")
  case Failure(t) =>
    // ...
}

上述方法可确保在通知发件人之前使用整个文件。

请注意,Akka Streams有一个Framing对象,可以解析ByteString流中的行:

val processStream: Future[IOResult] =
  fileStream
    .via(Framing.delimiter(
      ByteString(System.lineSeparator()),
      maximumFrameLenght = 256,
      allowTruncation = true))
    .map(_.ut8String)
    .to(actorSink) // the actor will have to expect String, not Array[String], messages
    .run()
© www.soinside.com 2019 - 2024. All rights reserved.