Pyspark:将所有压缩的csvs合并到python中的一个csv中

问题描述 投票:0回答:1

如果我有压缩csvs形式的大数据,我如何将它组合成一个单独的csv文件(压缩输出与否无关紧要)?

我正在将它读入一个火花数据帧,但后来我陷入了如何连接pyspark Dataframes。

下面是我运行循环的代码,并希望为每个循环运行附加Dataframe:

        schema=StructType([])
        result = spark.createDataFrame(sc.emptyRDD(), schema)
        for day in range(1,31):
            day_str = str(day) if day>=10 else "0"+str(day)
            print 'Ingesting %s' % day_str
            df = spark.read.format("csv").option("header", "false").option("delimiter", "|").option("inferSchema", "true").load("s3a://key/201811%s" % (day_str))
            result = result.unionAll(df)

        result.write.save("s3a://key/my_result.csv", format='csv')

这给了我错误AnalysisException: u"Union can only be performed on tables with the same number of columns, but the first table has 0 columns and the second table has 1 columns;;\n'Union\n:- LogicalRDD\n+- Relation[_c0#75] csv\n"。任何人都可以帮我,我该怎么办?

python pyspark
1个回答
0
投票

这对我有用:

result=spark.createDataFrame(sc.emptyRDD(), schema_mw)

for day in range(1,31):
    day_str = str(day) if day>=10 else "0"+str(day)
    print 'Ingesting %s' % day_str

    df = spark.read.format("csv").option("header", "false").option("delimiter", ",").schema(schema_mw).load("s3a://bucket/201811%s" % (day_str))

    if result:
        result = result.union(df)
    else:
        result = df
result.repartition(1).write.save("s3a://bucket/key-Compiled", format='csv', header=False)

但是,当我尝试在最后一步中将标头加载为true以进行重新分区时,标头将存储为一行。我不知道如何将这些标题添加为标题而不是作为一行添加。

© www.soinside.com 2019 - 2024. All rights reserved.