最近我开始使用Akka,我用它来创建一个使用Akka HTTP上传文件的REST API。该文件可以有数百万条记录,对于每条记录,我需要执行一些验证和业务逻辑。我对我的actor进行建模的方式是,root actor接收文件流,将bytes转换为String,然后按行分隔符拆分记录。执行此操作后,它将流(按记录记录)发送给另一个actor进行处理,然后根据某些分组将记录分发给其他actor。要将蒸汽从主根演员发送到演员进行处理我正在使用Sink.actorRefWithAck
。
这对于一个小文件工作正常,但对于我观察到的大文件,我得到多个块并且第一个块正在处理。如果我根据负载添加Thread.sleep
几秒钟,那么它正在处理整个文件。我想知道是否有任何方式我可以知道流是否已被处理完全消耗掉,以便我不必处理Thread.sleep
。这是我使用的代码片段:
val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)
val actorSink = Sink.actorRefWithAck(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val processStream =
fileStream
.map(byte => byte.utf8String.split(System.lineSeparator()))
.runWith(actorSink)
Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")
任何有关我所采取的方法的专家建议都将受到高度赞赏。
如果您只有一个文件的Source,则可以通过等待从runWith方法返回的Future来等待流完成。
如果您有多个文件的源,您应该写如下:
filesSource
.mapAsync(1)(data => (receiver ? data).mapTo[ProcessingResult])
.mapAsync(1)(processingResult => (resultListener ? processingResult).mapTo[ListenerResponse])
.runWith(Sink.ignore)
当流成功完成或失败时,receiver
演员将收到OnCompleteMessage
或onErrorMessage
,所以你应该在接收器receive
演员的DefaultFileUploadProcessActor
块中处理这些消息。
假设fileStream
是Source[ByteString, Future[IOResult]
,一个想法是保留源的物化价值,然后在物化价值完成后触发对sender()
的回复:
val processStream: Future[IOResult] =
fileStream
.map(_.utf8String.split(System.lineSeparator()))
.to(actorSink)
.run()
processStream.onComplete {
case Success(_) =>
log.info("completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")
case Failure(t) =>
// ...
}
上述方法可确保在通知发件人之前使用整个文件。
请注意,Akka Streams有一个Framing
对象,可以解析ByteString
流中的行:
val processStream: Future[IOResult] =
fileStream
.via(Framing.delimiter(
ByteString(System.lineSeparator()),
maximumFrameLenght = 256,
allowTruncation = true))
.map(_.ut8String)
.to(actorSink) // the actor will have to expect String, not Array[String], messages
.run()