spark在S3上的分区内创建分区

问题描述 投票:2回答:1

我有下面的标签分隔的样本数据集。

col1  period  col3  col4  col5  col6  col7  col8  col9  col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22
ASSDF 202001  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202002  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202003  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202004  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
...
...
ASSDF 202312  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99

我在这个数据上运行一些转换和最终数据是在火花数据集 "DS1". 之后,我写的数据集到s3的 "期间 "分区。因为我想在s3文件中的周期,我创建另一列 "datasetPeriod "从周期列。

我的Scala函数来保存TSV数据集。

def saveTsvDataset(dataframe: DataFrame, outputFullPath: String, numPartitions: Integer, partitionCols: String*): Unit = {
    dataframe
      .repartition(numPartitions)
      .write
      .partitionBy(partitionCols:_*)
      .mode(SaveMode.Overwrite)
      .option("sep", "\t")
      .csv(outputFullPath)
  }

在s3上保存数据集的Scala代码。为s3上的分区添加新列datasetPeriod。

 saveTsvDataset(
      DS1.withColumn("datasetPeriod",$"period")
      , "s3://s3_path"
      , 100
      , "period"
    )

现在,我的问题是我有一个从202001到202312的周期,当我在s3上写上 "datasetPeriod "的分区时,有时它会在分区内创建任意周期的分区。所以这发生在任何时期都是随机的。我从来没有见过这种情况发生在多个时期。它创建的路径是这样的 "s3://s3_path/datasetPeriod=202008/datasetPeriod=202008".

scala apache-spark amazon-s3 apache-spark-sql apache-spark-dataset
1个回答
0
投票

你已经有一个 period 列的数据框架中。所以不需要再创建一个新的重复的 datasetPeriod 列。

当你把DataFrame写到一个 s3://../parentFolder 使用 .partitionBy("period") 它创建的文件夹如下。

df.write.partitionBy("period").csv("s3://../parentFolder/")
s3://.../parentFolder/period=202001/
s3://.../parentFolder/period=202002/
s3://.../parentFolder/period=202003/
...
s3://.../parentFolder/period=202312/

在读取数据的时候,只需要提到路径,直到... parentFolder 只,将自动改为 period 作为其中一列。

val df = spark.read.csv("s3://../parentFolder/")
//df.schema will give you `period` as one of the column
df.printSchema
root
 |-- col1: string (nullable = true)
 |-- .... //other columns go here
 |-- period: string (nullable = true)

也就是说,不管你得到的是什么多分区里面的分区列,都只是由于你在使用partitionBy写数据时使用了错误的路径。

© www.soinside.com 2019 - 2024. All rights reserved.