如何在一个字符串中读取整个文件

问题描述 投票:0回答:6

我想读取 pyspark.lf 中的 json 或 xml 文件。如果我的文件被分成多行

rdd= sc.textFile(json or xml) 

输入

{
" employees":
[
 {
 "firstName":"John",
 "lastName":"Doe" 
},
 { 
"firstName":"Anna"
  ]
}

输入分布在多行中。

预期输出

{"employees:[{"firstName:"John",......]}

如何使用 pyspark 在一行中获取完整文件?

json apache-spark apache-spark-sql
6个回答
10
投票

有3种方式(我发明了第3种,前两种是标准内置的Spark函数),这里的解决方案在PySpark中:

textFile、wholeTextFile 和带标签的 textFile(键 = 文件,值 = 文件中的 1 行。这是两种解析文件的给定方法之间的混合)。

1.) 文本文件

输入:

rdd = sc.textFile('/home/folder_with_text_files/input_file')

输出:包含 1 行文件作为每个条目的数组,即。 [第 1 行,第 2 行,...]

2.) 整个文本文件

输入:

rdd = sc.wholeTextFiles('/home/folder_with_text_files/*')

输出:元组数组,第一项是带有文件路径的“键”,第二项包含1个文件的全部内容,即。

[(u'文件:/home/folder_with_text_files/', u'file1_contents'), (u'文件:/home/folder_with_text_files/', file2_contents), ...]

3.)“标记”文本文件

输入:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

输出:数组,每个条目包含一个元组,使用文件名作为键,值=文件的每一行。 (从技术上讲,使用这种方法,除了实际的文件路径名称之外,您还可以使用不同的密钥 - 也许是一种散列表示形式以保存内存)。 IE。

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

您还可以将其中任何一个重新组合为行列表:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

或者将整个文件重新组合回单个字符串(在此示例中,结果与从 WholeTextFiles 获得的结果相同,但从文件路径中删除了字符串“file:”。):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()


7
投票

这就是你在 scala 中要做的事情

rdd = sc.wholeTextFiles("hdfs://nameservice1/user/me/test.txt")
rdd.collect.foreach(t=>println(t._2))

7
投票

如果您的数据未按照

textFile
的预期形成在一行上,则使用
wholeTextFiles

这将为您提供整个文件,以便您可以将其解析为您想要的任何格式。


5
投票

“如何在一个字符串中读取整个 [HDFS] 文件 [在 Spark 中,用作 sql]”:

例如

// Put file to hdfs from edge-node's shell...

hdfs dfs -put <filename>

// Within spark-shell...

// 1. Load file as one string
val f = sc.wholeTextFiles("hdfs:///user/<username>/<filename>")
val hql = f.take(1)(0)._2

// 2. Use string as sql/hql
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val results = hiveContext.sql(hql)

2
投票

Python方式

rdd = spark.sparkContext.wholeTextFiles("hdfs://nameservice1/user/me/test.txt")
json = rdd.collect()[0][1]

0
投票

根据https://spark.apache.org/docs/latest/sql-data-sources-text.html,您可以阅读:

text_df = spark.read.text("your_path", wholetext=True)
text = text_df.first().value
© www.soinside.com 2019 - 2024. All rights reserved.