如何读取包含S3上的实木复合地板的.tar文件作为Spark中的数据帧?

问题描述 投票:1回答:1

我需要在使用Scala / Spark的S3上加载一个.tar文件,其中包含具有不同架构的多个镶木地板。理想情况下,我想将这些实木复合地板之一读入Spark数据帧。我尝试获取s3对象,然后使用org.apache.commons.compress.archivers.tar.TarArchiveInputStream转换为tar输入流,它能够创建tar输入流,但无法读取tar条目。

val s3client: AmazonS3 = AmazonS3ClientBuilder.
      standard().
      withCredentials(new InstanceProfileCredentialsProvider()).
      withRegion(my_region).
      build();

val tarFile = s3client.getObject(my_bucket, my_tar_file)
val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)
tarInputStream.getNextTarEntry() <-- error thrown in this line

错误:

java.io.IOException: Error detected parsing the header
  at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:240)
  ... 52 elided
Caused by: java.lang.IllegalArgumentException: Invalid byte 48 at offset 7 in '00755{NUL}00' len=8
  at org.apache.commons.compress.archivers.tar.TarUtils.parseOctal(TarUtils.java:127)
  at org.apache.commons.compress.archivers.tar.TarUtils.parseOctalOrBinary(TarUtils.java:171)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:935)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:924)
  at org.apache.commons.compress.archivers.tar.TarArchiveEntry.<init>(TarArchiveEntry.java:328)
  at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:238)

是否有人知道在Spark中的s3上提取部分tar文件的正确方法?

scala apache-spark amazon-s3 tar parquet
1个回答
0
投票

在您的情况下,您将对象作为InputStream传递。因此,我的建议是将其作为GzipInputstream传递,然后阅读条目:

val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)

val tarInputStream = new TarArchiveInputStream(new GZIPInputStream(tarFile))
val entry: TarArchiveEntry = readEntries(tarInputStream)
def readEntries(tarInputStream: TarArchiveInputStream): TarArchiveEntry = {
  var currentEntry = Option(tarInputStream.getNextTarEntry())
  // you can use functional approach with foldLeft, reduce or something else or while loop
  // implementation details here
}

您可以找到如何使用TarArchiveInputStream的用法here

© www.soinside.com 2019 - 2024. All rights reserved.