如何使用Akka再次传输文件?

问题描述 投票:0回答:1

我是Akka的初学者。 (我正在使用Java)

我正在使用Akka制作文件传输系统。

当前,我已经完成发送Actor1(本地)-> Actor2(远程)文件。

现在,

当我在传输文件时遇到问题时,我正在考虑如何解决它。然后我有一个问题。问题如下。

如果我在传输文件时丢失了网络连接,则文件传输失败(已完成90%)。几分钟后,我将恢复网络连接。

是否可以传输其余文件数据? (剩余10%)

如果可能,请给我一些建议。

这是我的简单代码。谢谢:)

Actor1(本地)

private Behavior<Event> onTick() {
    ....
    String fileName = "test.zip";
    Source<ByteString, CompletionStage<IOResult>> logs = FileIO.fromPath(Paths.get(fileName));
    logs.runForeach(f -> originalSize += f.size(), mat).thenRun(() -> System.out.println("originalSize : " + originalSize));
    SourceRef<ByteString> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
    getContext().ask(
            Receiver.FileTransfered.class,
            selectedReceiver,
            timeout,
            responseRef -> new Receiver.TransferFile(logsRef, responseRef, fileName),
            (response, failure) -> {
                if (response != null) {
                    return new TransferCompleted(fileName, response.transferedSize);
                } else {
                    return new JobFailed("Processing timed out", fileName);
                }
            }
    );
}

演员2(远程)

public static Behavior<Command> create() {
    return Behaviors.setup(context -> {
        ...
        Materializer mat = Materializer.createMaterializer(context);
        return Behaviors.receive(Command.class)
                .onMessage(TransferFile.class, command -> {
                    command.sourceRef.getSource().runWith(FileIO.toPath(Paths.get("test.zip")), mat);
                    command.replyTo.tell(new FileTransfered("filename", 1024));
                    return Behaviors.same();
                }).build();
    });
}
java akka akka-stream akka-http
1个回答
0
投票

您需要考虑以下事项,以正确实现具有容错能力的文件传输:

  1. 如何识别必须为给定的文件恢复传输。
  2. 如何找到要从中继续传输的point

以下实现对1和2作了非常简单的假设。

  1. 文件名是唯一的,因此可以用于这种标识。严格来说,这是不正确的,例如,您可以从不同的文件夹中传输具有相同名称的文件。您将必须根据用例重新调整此设置。
  2. 假定接收方的最后/所有写入均正确地写入了所有字节,并且写入字节的总数指示恢复传输的点。如果不能保证这一点,则需要将原始文件逻辑上拆分为多个块,并将每个块的哈希值,大小和位置传送给接收者,接收者必须在其一侧验证块并找到正确的指针以恢复传输。
  3. (多于2 :))此实现将忽略对传输问题的识别,而将重点放在1和2上。

代码:

object Sender {
  sealed trait Command
  case class Upload(file: String) extends Command
  case class StartWithIndex(file: String, index: Long) extends Sender.Command

  def behavior(receiver: ActorRef[Receiver.Command]): Behavior[Sender.Command] = Behaviors.setup[Sender.Command] { ctx =>
    implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
    Behaviors.receiveMessage {
      case Upload(file) =>
        receiver.tell(Receiver.InitUpload(file, ctx.self.narrow[StartWithIndex]))
        ctx.log.info(s"Initiating upload of $file")
        Behaviors.same
      case StartWithIndex(file, starWith) =>
        val source = FileIO.fromPath(Paths.get(file), chunkSize = 8192, starWith)
        val ref = source.runWith(StreamRefs.sourceRef())
        ctx.log.info(s"Starting upload of $file")
        receiver.tell(Receiver.Upload(file, ref))
        Behaviors.same
    }
  }
}
object Receiver {
  sealed trait Command

  case class InitUpload(file: String, replyTo: ActorRef[Sender.StartWithIndex]) extends Command

  case class Upload(file: String, fileSource: SourceRef[ByteString]) extends Command

  val behavior: Behavior[Receiver.Command] = Behaviors.setup[Receiver.Command] { ctx =>
    implicit val materializer: Materializer = SystemMaterializer(ctx.system).materializer
    Behaviors.receiveMessage {
      case InitUpload(path, replyTo) =>
        val file = fileAtDestination(path)
        val index = if (file.exists()) file.length else 0
        ctx.log.info(s"Got init command for $file at pointer $index")
        replyTo.tell(Sender.StartWithIndex(path, index.toLong))
        Behaviors.same
      case Upload(path, fileSource) =>
        val file = fileAtDestination(path)
        val sink = if (file.exists()) {
          FileIO.toPath(file.toPath, Set(StandardOpenOption.APPEND, StandardOpenOption.WRITE))
        } else {
          FileIO.toPath(file.toPath, Set(StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))
        }
        ctx.log.info(s"Saving file into ${file.toPath}")
        fileSource.runWith(sink)
        Behaviors.same
    }
  }
}

一些辅助方法

val destination: File = Files.createTempDirectory("destination").toFile

def fileAtDestination(file: String) = {
  val name = new File(file).getName
  new File(destination, name)
}

def writeRandomToFile(file: File, size: Int): Unit = {
  val out = new FileOutputStream(file, true)
  (0 until size).foreach { _ =>
    out.write(Random.nextPrintableChar())
  }
  out.close()
}

最后还有一些测试代码

// sender and receiver bootstrapping is omitted

//Create some dummy file to upload
val file: Path = Files.createTempFile("test", "test")
writeRandomToFile(file.toFile, 1000)

//Initiate a new upload
sender.tell(Sender.Upload(file.toAbsolutePath.toString))
// Sleep to allow file upload to finish
Thread.sleep(1000)

//Write more data to the file to emulate a failure
writeRandomToFile(file.toFile, 1000)
//Initiate a new upload that will "recover" from the previous upload 
sender.tell(Sender.Upload(file.toAbsolutePath.toString))

最后,整个过程可以定义为

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.