无法使用 Spark dataframe 和 scala 创建 CSV,而是创建文件夹名称中包含“.csv”的文件夹

问题描述 投票:0回答:1

我无法使用 Spark 数据框编写或创建 csv。相反,它为我创建目录。这是我的代码

package com.package.dssupplier

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, concat_ws}

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

object DataSupplier extends Serializable {
  private val s3_input_loc: String = sys.env.getOrElse("S3_INPUT_PATH", "s3a://bucketname/data/export-db/dev/")

  def main(args: Array[String]): Unit = {
    println("DataSupplier Main started....................")
    val sqlContext = SparkSession.builder().appName("DataSupplier")
      .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
      .config("spark.sql.files.overwrite", "true")
      .master("local")
      .getOrCreate()
    sqlContext.sparkContext.setLogLevel("WARN");

    val latestDF = sqlContext.read.text(s3_input_loc + "latest.txt").toDF("timestamp").select("timestamp")
    val latest = latestDF.first().getString(0)
    val dataPath = s3_input_loc + "/" + latest + "/dta/1/"

    sqlContext.read.parquet(dataPath).createOrReplaceTempView("dataTable")

    val dataQuery = "SELECT primaryid FROM dataTable WHERE name = 'John' limit 10"
    val dataResults = sqlContext.sql(dataQuery);
    dataResults.createOrReplaceTempView("dataResults")
    dataResults.show(100)

    val timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))
    val outputFileName = s"file_${timestamp}.csv"

    dataResults
      .withColumn("primaryid", concat_ws("$", col("primaryid")))
      .write.option("delimiter", "\t").option("header", "true")
      .mode(SaveMode.Overwrite)
      .format("com.databricks.spark.csv")
      .save(path =  "C:\\Users\\name\\output.csv")

    sqlContext.stop()
  }
}

但它不是创建 csv 文件,而是创建文件夹

output.csv/_temporary\0
并且没有 csv 文件。请告诉我如何解决,谢谢

scala apache-spark pyspark apache-spark-sql
1个回答
0
投票

当 Spark 在文件夹中写入时,需要首先重新分区为 1,因为这样该子文件夹中只会写入 1 个分区。之后需要一些系统操作系统操作才能将其从子文件夹移动到父文件夹内容。不幸的是,我只有 python/pyspark 中的代码,但它为您提供了如何做到这一点的想法:


def save_as_one_csv_file(
    data: DataFrame,
    output_file: str,
    null_value: str = "null",
    sep: str = ",",
    empty_value: str = "§§",
):
    """
    Saves a spark dataframe as a single csv file.

    * Fills up null values with the String "null", as applications like QlikSense
    cannot differentiate between empty strings and
    null values in csv data.
    * Uses separator ",", and "§" for quoting and empty
    strings as this is a rarely used
    character and qlik has problems reading quoted data.
    * Ensures that the final output is a csv file in the given
    output path, not a folder as used by spark.

    :param data: The spark dataframe to write.
    :param output_file: The output path for writing the csv file.
    :param null_value: Null values that needs to get replaces by character
    :param sep:
    :param empty_value:
    """

    localpath = get_local_path(output_file)
    (
        data.repartition(1)
        .write.mode("overwrite")
        .option("header", True)
        .option("sep", sep)
        .option("quote", "§")
        .option("emptyValue", empty_value)
        .option("nullValue", null_value)
        .option("escape", "§")
        .csv(output_file + "_temp")
    )
    file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".csv")][
        0
    ]
    os.replace(os.path.join(localpath + "_temp", file), localpath)

    tmp_file_path = localpath + "_temp"
    shutil.rmtree(tmp_file_path)

© www.soinside.com 2019 - 2024. All rights reserved.