我们如何更改spark输出的文件名[sparkJava]

问题描述 投票:0回答:1

无论如何,我想更改spark输出文件名,Spark在AWS S3中编写。

apache-spark spark-java
1个回答
0
投票

检查下面的代码将同时适用于HDFSS3

创建的rename函数将更改路径和名称,如果directory包含多个文件,则只需在文件名末尾附加sequence号,例如json_data_1.json

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._

// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
}

import java.net.URI
def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)

// Rename files 
def rename(path: String,name: String) = {
    fs(path)
    .listFiles(new Path(path),true)
    .toList
    .filter(_.isFile)
    .map(_.getPath)
    .filterNot(_.toString.contains("_SUCCESS"))
    .zipWithIndex
    .map(p => fs(p._1.toString).rename(p._1,new Path(s"${p._1.getParent}/${name}_${p._2}.${p._1.toString.split("\\.")(1)}")))
}
scala> df.repartition(5).write.format("json").mode("overwrite").save("/tmp/samplea/")

scala> "ls -ltr /tmp/samplea/".!
total 8
-rw-r--r--  1 vn50ftc  wheel    0 Jun  6 13:57 part-00000-607ffd5e-7d28-4331-9a69-de36254c80b1-c000.json
-rw-r--r--  1 vn50ftc  wheel  282 Jun  6 13:57 part-00001-607ffd5e-7d28-4331-9a69-de36254c80b1-c000.json
-rw-r--r--  1 vn50ftc  wheel    0 Jun  6 13:57 _SUCCESS
res191: Int = 0

scala> rename("/tmp/samplea","json_data")
res193: List[Boolean] = List(true, true)

scala> "ls -ltr /tmp/samplea/".!
total 8
-rw-r--r--  1 vn50ftc  wheel    0 Jun  6 13:57 json_data_0.json
-rw-r--r--  1 vn50ftc  wheel  282 Jun  6 13:57 json_data_1.json
-rw-r--r--  1 vn50ftc  wheel    0 Jun  6 13:57 _SUCCESS
res194: Int = 0

© www.soinside.com 2019 - 2024. All rights reserved.