akka.http.scaladsl.model.ParsingException:使用 akka http 将大文件上传到 S3 时,多部分实体意外结束

问题描述 投票:0回答:2

我正在尝试使用 Akka HTTP 和 Alpakka S3 连接器将一个大文件(目前为 90 MB)上传到 S3。它对于小文件(25 MB)工作正常,但是当我尝试上传大文件(90 MB)时,出现以下错误:

akka.http.scaladsl.model.ParsingException: Unexpected end of multipart entity
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:108)
at akka.http.scaladsl.unmarshalling.MultipartUnmarshallers$$anonfun$1.applyOrElse(MultipartUnmarshallers.scala:103)
at akka.stream.impl.fusing.Collect$$anon$6.$anonfun$wrappedPf$1(Ops.scala:227)
at akka.stream.impl.fusing.SupervisedGraphStageLogic.withSupervision(Ops.scala:186)
at akka.stream.impl.fusing.Collect$$anon$6.onPush(Ops.scala:229)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:510)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:485)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:739)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:765)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
at akka.actor.ActorCell.invoke(ActorCell.scala:583)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

尽管我最后收到成功消息,但文件未完全上传。它只能上传 45-50 MB。

我正在使用以下代码: S3Utility.scala

    class S3Utility(implicit as: ActorSystem, m: Materializer) {
  private val bucketName = "test"

  def sink(fileInfo: FileInfo): Sink[ByteString, Future[MultipartUploadResult]] = {
    val fileName = fileInfo.fileName
    S3.multipartUpload(bucketName, fileName)
  }
}

路线:

def uploadLargeFile: Route =
  post {
    path("import" / "file") {
      extractMaterializer { implicit materializer =>
        withoutSizeLimit {
          fileUpload("file") {
            case (metadata, byteSource) =>
              logger.info(s"Request received to import large file: ${metadata.fileName}")
              val uploadFuture = byteSource.runWith(s3Utility.sink(metadata))
              onComplete(uploadFuture) {
                case Success(result) =>
                  logger.info(s"Successfully uploaded file")
                  complete(StatusCodes.OK)
                case Failure(ex) =>
                  println(ex, "Error in uploading file")
                  complete(StatusCodes.FailedDependency, ex.getMessage)
              }
          }
        }
      }
    }
  }

任何帮助将不胜感激。谢谢

scala akka-stream akka-http alpakka
2个回答
1
投票

策略1

您可以将文件分成更小的块并重试吗,这里是示例代码:

AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("some-kind-of-endpoint"))
            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("user", "pass")))
            .disableChunkedEncoding()
            .withPathStyleAccessEnabled(true)
            .build();

    // Create a list of UploadPartResponse objects. You get one of these
    // for each part upload.
    List<PartETag> partETags = new ArrayList<PartETag>();

    // Step 1: Initialize.
    InitiateMultipartUploadRequest initRequest = new
            InitiateMultipartUploadRequest("bucket", "key");
    InitiateMultipartUploadResult initResponse =
            s3Client.initiateMultipartUpload(initRequest);

    File file = new File("filepath");
    long contentLength = file.length();
    long partSize = 5242880; // Set part size to 5 MB.

    try {
        // Step 2: Upload parts.
        long filePosition = 0;
        for (int i = 1; filePosition < contentLength; i++) {
            // Last part can be less than 5 MB. Adjust part size.
            partSize = Math.min(partSize, (contentLength - filePosition));

            // Create a request to upload a part.
            UploadPartRequest uploadRequest = new UploadPartRequest()
                    .withBucketName("bucket").withKey("key")
                    .withUploadId(initResponse.getUploadId()).withPartNumber(i)
                    .withFileOffset(filePosition)
                    .withFile(file)
                    .withPartSize(partSize);

            // Upload part and add response to our list.
            partETags.add(
                    s3Client.uploadPart(uploadRequest).getPartETag());

            filePosition += partSize;
        }

        // Step 3: Complete.
        CompleteMultipartUploadRequest compRequest = new
                CompleteMultipartUploadRequest(
                "bucket",
                "key",
                initResponse.getUploadId(),
                partETags);

        s3Client.completeMultipartUpload(compRequest);
    } catch (Exception e) {
        s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(
                "bucket", "key", initResponse.getUploadId()));
    }

策略2

增加Akka HTTP服务器的

idle-timeout
(只需将其设置为
infinite
),如下所示:

akka.http.server.idle-timeout=infinite

这会增加服务器预计空闲的时间段。默认情况下,其值为 60 秒。如果服务器无法在该时间段内上传文件,它将关闭连接并抛出“多部分实体意外结束”错误。


0
投票

我在使用多部分上传时遇到新的异常。 将文件上传到 s3 时发生错误。异常中止: org.bouncycastle.crypto.io.InvalidCipherTextIOException:完成密码时出错 导致:org.bouncycastle.crypto.InvalidCipherTextException:GCM 中的 mac 检查失败

© www.soinside.com 2019 - 2024. All rights reserved.