我是Akka的初学者。 (我正在使用Java)
我正在使用Akka制作文件传输系统。
当前,我已经完成发送Actor1(本地)-> Actor2(远程)文件。
现在,
当我在传输文件时遇到问题时,我正在考虑如何解决它。然后我有一个问题。问题如下。
如果我在传输文件时丢失了网络连接,则文件传输失败(已完成90%)。几分钟后,我将恢复网络连接。
是否可以传输其余文件数据? (剩余10%)
如果可能,请给我一些建议。
这是我的简单代码。谢谢:)
Actor1(本地)
private Behavior<Event> onTick() {
....
String fileName = "test.zip";
Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
getContext().ask(
Receiver.FileTransfered.class,
selectedReceiver,
timeout,
responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
(response, failure) -> {
if (response != null) {
return new TransferCompleted(fileName, response.transferedSize);
} else {
return new JobFailed("Processing timed out", fileName);
}
}
);
}
演员2(远程)
public static Behavior<Command> create() {
return Behaviors.setup(context -> {
...
Materializer mat = Materializer.createMaterializer(context);
return Behaviors.receive(Command.class)
.onMessage(TransferFile.class, command -> {
command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
command.replyTo.tell(new FileTransfered("filename", 1024));
return Behaviors.same();
}).build();
});
}
您需要考虑以下事项,以正确实现具有容错能力的文件传输:
以下实现对1和2作了非常简单的假设。
代码:
object Sender {
sealed trait Command
case class Upload(file: String) extends Command
case class StartWithIndex(file: String, index: Long) extends Sender.Command
def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case Upload(file) =>
receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
ctx.log.info(s"Initiating upload of $file")
Behaviors.same
case StartWithIndex(file, starWith) =>
val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
val ref = source.runWith(StreamRefs.sourceRef())
ctx.log.info(s"Starting upload of $file")
receiver.tell(Receiver.Upload(file, ref))
Behaviors.same
}
}
}
object Receiver {
sealed trait Command
case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command
case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command
val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
Behaviors.receiveMessage {
case InitUpload(path, replyTo) =>
val file = fileAtDestination(path)
val index = if (file.exists()) file.length else 0
ctx.log.info(s"Got init command for $file at pointer $index")
replyTo.tell(Sender.StartWithIndex(path, index.toLong))
Behaviors.same
case Upload(path, fileSource) =>
val file = fileAtDestination(path)
val sink = if (file.exists()) {
FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
} else {
FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
}
ctx.log.info(s"Saving file into ${file.toPath}")
fileSource.runWith(sink)
Behaviors.same
}
}
}
一些辅助方法
val destination: File = Files.createTempDirectory("destination").toFile
def fileAtDestination(file: String) = {
val name = new File(file).getName
new File(destination, name)
}
def writeRandomToFile(file: File, size: Int): Unit = {
val out = new FileOutputStream(file, true)
(0 until size).foreach { _ =>
out.write(Random.nextPrintableChar())
}
out.close()
}
最后还有一些测试代码
// sender and receiver bootstrapping is omitted
//Create some dummy file to upload
val file: Path = Files.createTempFile("test", "test")
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
// Sleep to allow file upload to finish
Thread.sleep(1000)
//Write more data to the file to emulate a failure
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload that will "recover" from the previous upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
最后,整个过程可以定义为