样本名称.csv数据:
Name, ,Age, ,Class,
Diwakar,, ,25,, ,12,
, , , , ,
Prabhat, ,27, ,15,
Zyan, ,30, ,17,
Jack, ,35, ,21,
读取csv文件:
names = spark.read.csv("name.csv", header="true", inferSchema="true")
names.show()
将其作为输出,我们正在丢失一些数据:
+-------+----+---+---+-----+----+
| Name| 1|Age| 3|Class| _c5|
+-------+----+---+---+-----+----+
|Diwakar|null| | 25| null| |
| | | | | |null|
|Prabhat| | 27| | 15|null|
| Zyan| | 30| | 17|null|
| Jack| | 35| | 21|null|
+-------+----+---+---+-----+----+
我想获得如下所示的输出:
+-------+---+---+---+-----+----+
| Name| 1|Age| 3|Class| _c5|
+-------+---+---+---+-----+----+
|Diwakar| | 25| | 12|null|
| | | | | |null|
|Prabhat| | 27| | 15|null|
| Zyan| | 30| | 17|null|
| Jack| | 35| | 21|null|
+-------+---+---+---+-----+----+
我们可以通过定义schema
来读取所有字段,然后在读取CSV文件时使用schema
,然后使用When Otherwise
来获取Age,Class
的数据列。
Example:
from pyspark.sql.functions import *
from pyspark.sql.types import *
#define schema with same number of columns in csv file
sch=StructType([
StructField("Name", StringType(), True),
StructField("1", StringType(), True),
StructField("Age", StringType(), True),
StructField("3", StringType(), True),
StructField("Class", StringType(), True),
StructField("_c5", StringType(), True),
StructField("_c6", StringType(), True)
])
#reading csv file with schema
df=spark.read.schema(sch).option("header",True).csv("name.csv")
df.withColumn('Age', when(length(trim(col('Age'))) == 0, col('3')).otherwise(col('Age'))).\
withColumn('1',lit("")).\
withColumn('3',lit("")).\
withColumn('Class',when((col('Class').isNull())|(lower(col('Class')) == 'null'), col('_c6')).when(length(trim(col('Class'))) == 0, lit("null")).otherwise(col('Class'))).\
withColumn('_c5',lit("null")).\
drop("_c6").\
show()
#+-------+---+---+---+-----+----+
#| Name| 1|Age| 3|Class| _c5|
#+-------+---+---+---+-----+----+
#|Diwakar| | 25| | 12|null|
#| | | | | null|null|
#|Prabhat| | 27| | 15|null|
#| Zyan| | 30| | 17|null|
#| Jack| | 35| | 21|null|
#+-------+---+---+---+-----+----+