我需要在使用Scala / Spark的S3上加载一个.tar文件,其中包含具有不同架构的多个镶木地板。理想情况下,我想将这些实木复合地板之一读入Spark数据帧。我尝试获取s3对象,然后使用org.apache.commons.compress.archivers.tar.TarArchiveInputStream转换为tar输入流,它能够创建tar输入流,但无法读取tar条目。
val s3client: AmazonS3 = AmazonS3ClientBuilder.
standard().
withCredentials(new InstanceProfileCredentialsProvider()).
withRegion(my_region).
build();
val tarFile = s3client.getObject(my_bucket, my_tar_file)
val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)
tarInputStream.getNextTarEntry() <-- error thrown in this line
错误:
java.io.IOException: Error detected parsing the header
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:240)
... 52 elided
Caused by: java.lang.IllegalArgumentException: Invalid byte 48 at offset 7 in '00755{NUL}00' len=8
at org.apache.commons.compress.archivers.tar.TarUtils.parseOctal(TarUtils.java:127)
at org.apache.commons.compress.archivers.tar.TarUtils.parseOctalOrBinary(TarUtils.java:171)
at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:935)
at org.apache.commons.compress.archivers.tar.TarArchiveEntry.parseTarHeader(TarArchiveEntry.java:924)
at org.apache.commons.compress.archivers.tar.TarArchiveEntry.<init>(TarArchiveEntry.java:328)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:238)
是否有人知道在Spark中的s3上提取部分tar文件的正确方法?
在您的情况下,您将对象作为InputStream传递。因此,我的建议是将其作为GzipInputstream传递,然后阅读条目:
val tarInputStream = new TarArchiveInputStream(tarFile.getObjectContent)
val tarInputStream = new TarArchiveInputStream(new GZIPInputStream(tarFile))
val entry: TarArchiveEntry = readEntries(tarInputStream)
def readEntries(tarInputStream: TarArchiveInputStream): TarArchiveEntry = {
var currentEntry = Option(tarInputStream.getNextTarEntry())
// you can use functional approach with foldLeft, reduce or something else or while loop
// implementation details here
}
您可以找到如何使用TarArchiveInputStream的用法here