如何使用pyspark将DataFrame的每一行另存为HDFS文件

问题描述 投票:0回答:1

我有一个以ctrl A分隔的文件,并且具有以下标头:

文件名”,“文件元数据”,“数据内容”,“状态”,“错误类型”,“错误消息”

我需要将单个文件转储到hdfs,以将文件的每个记录转储到-basepath_errortype_filename / file.json之类的目录,并且文件的内容将是data_content列。

显示示例数据:

>>> ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
>>> ff_df .show()
+--------------+-------------+--------------------+------+-------------+--------------------+
|     file_name|file_metadata|        data_content|status|   error_type|       error_message|
+--------------+-------------+--------------------+------+-------------+--------------------+
|test_file.json|     metadata|{ "fruit": "Apple...|FAILED| INVALID_JSON|     could not parse|
|demo_file.json|     metadata|{ "fruit": "Apple...|FAILED|MISSING_RULES|No matching rules...|
+--------------+-------------+--------------------+------+-------------+--------------------+

现在,我需要这两行作为hdfs中的两个文件,分别位于/ tmp / INVALID_JSON_test_file和/ tmp / MISSING_RULES_demo_file文件夹中。我已经写了下面的pyspark代码,但是没有得到想要的结果。请帮助

def write_file(line)
 tokens=line.split("\x01")
 file_name=tokens[0]
 error_type=tokens[4]
 content=tokens[2]
 #define path to saved file
 file_name = %s + "/" + 
 directory_name = basePath"/"error_type"/"file_name
 return directory_name

# get the file content
ff_rdd = sc.textFile("/tmp/pyspark1.txt").map(lambda line: line.split("\x01"))
ff_df = ff_rdd.toDF(['file_name','file_metadata','data_content','status','error_type','error_message'])
content_df = ff_df.select("data_content")

file_path = sc.textFile("/tmp/pyspark1.txt").map(lambda line: write_file(line))
content_df.rdd.saveAsTextFile("file_path")```


pyspark apache-spark-sql rdd
1个回答
0
投票

样本输入:

+--------------+-------------+-------------------+------+-------------+-----------------+
|     file_name|file_metadata|       data_content|status|   error_type|    error_message|
+--------------+-------------+-------------------+------+-------------+-----------------+
|test_file.json|     metadata|{ "fruit": "Apple"}|FAILED| INVALID_JSON|  could not parse|
|demo_file.json|     metadata|   { "fruit": "Ab"}|FAILED|MISSING_RULES|No matching rules|
+--------------+-------------+-------------------+------+-------------+-----------------+

首先,我们将error_type列和file_name列(仅文件名,不包括扩展名)连接起来以创建newColumn

final_df = df_new.withColumn("newColumn",concat(col("error_type"),lit("_"),split("file_name","\.")[0]))

在运行df_new.show(truncate=false)时,我们将看到示例输出为:

+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|file_name     |file_metadata|data_content       |status|error_type   |error_message    |newColumn              |
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+
|test_file.json|metadata     |{ "fruit": "Apple"}|FAILED|INVALID_JSON |could not parse  |INVALID_JSON_test_file |
|demo_file.json|metadata     |{ "fruit": "Ab"}   |FAILED|MISSING_RULES|No matching rules|MISSING_RULES_demo_file|
+--------------+-------------+-------------------+------+-------------+-----------------+-----------------------+

为了以所需的格式实现目录结构,例如:Base_directory / INVALID_JSON_test_file,在编写时,我们将不得不基于创建的newColumn对final_df进行分区。

我们可以使用以下内容编写:

final_df.select("data_content","newColumn").write.partitionBy("newColumn").save(FilePath)

默认情况下,将写入实木复合地板文件。我认为不可能将输出作为文本文件写入,因为它不接受多列,我们需要将newColumndata_content一起写入,因为我们正在将数据帧分区newColumn的基础。

© www.soinside.com 2019 - 2024. All rights reserved.