使用pyspark,如何将文件中一行中的多个JSON文档读取到数据帧中?

问题描述 投票:0回答:4
使用 Spark 2.3,我知道我可以读取这样的 JSON 文档文件:

{'key': 'val1'} {'key': 'val2'}

有了这个:

spark.json.read('filename')

当 JSON 文档之间没有换行符时,如何将以下内容读入数据帧?

以下是输入示例。

{'key': 'val1'}{'key': 'val2'}

需要明确的是,我期望一个包含两行的数据框(

frame.count() == 2

)。

apache-spark dataframe pyspark apache-spark-sql
4个回答
1
投票
请尝试 -

df = spark.read.json(["fileName1","fileName2"])

如果您想读取文件夹中的所有 json 文件,也可以这样做 -

df = spark.read.json("data/*json")
    

0
投票
正如@cricket_007上面建议的那样,你最好修复输入文件

如果您确定 json 对象中没有内联右大括号,您可以执行以下操作:

with open('myfilename', 'r') as f: txt = f.read() txt = txt.replace('}', '}\n') with open('mynewfilename', 'w') as f: f.write(txt)

如果键或值中确实有“}”,则任务会变得更困难,但使用正则表达式并非不可能。不过这似乎不太可能。


0
投票
我们使用 RDD-Api 解决了这个问题,因为我们找不到任何以内存高效的方式使用 Dataframe-API 的方法(我们总是遇到执行程序 OoM 错误)。

以下函数将逐步尝试解析 json 并从文件中

yield

处理后续 json(
来自这篇文章):

from functools import partial from json import JSONDecoder from io import StringIO def generate_from_buffer(buffer: str, chunk: str, decoder: JSONDecoder): buffer += chunk while buffer: try: result, index = decoder.raw_decode(buffer) yield result buffer = buffer[index:].lstrip() except ValueError: # Not enough data to decode, read more break return buffer def parse_jsons_file(jsons_str: str, buffer_size: int = 1024): decoder = JSONDecoder() buffer = '' file_obj = StringIO(jsons_str) for chunk in iter(partial(file_obj.read, buffer_size), ''): buffer = yield from generate_from_buffer(buffer, chunk, decoder) if buffer: raise ValueError("Invalid input: should be concatenation of json strings")
我们首先用

.format("text")

读取json:

df: DataFrame = ( spark .read .format("text") .option("wholetext", True) .load(data_source_path) )
然后使用上面的函数将其转换为 RDD,

flatMap

,最后将其转换回 Spark 数据帧。为此,您必须为文件中的单个 json 定义 
json_schema
,无论如何,这是一个很好的做法。

rdd_df = (df_rdd.map(lambda row: row["value"]) .flatMap(lambda jsons_string: parse_jsons_file(jsons_string)) .toDF(json_schema))
    

0
投票
改编自

https://stackoverflow.com/a/77198142

很久以前发的帖子,但我现在也遇到同样的情况。就我而言,我陷入了 VPC 流日志的困境,其中每组 logEntries 都是一个 JSON 对象,每个文件有许多没有分隔符。和你一样,我看到了一组 JSON 对象,例如 {...}{...}{...}...

我同意,更改数据是最好的。但是,要在不重写的情况下使用现有数据,仍然需要一个解决方案。

这是我所做的(通过 AWS Glue 使用 pyspark):

from awsglue.transforms import * from pyspark.context import SparkContext from awsglue.context import GlueContext import pyspark.sql.functions as F from pyspark.sql.functions import explode, split, col, from_json sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Each file is a row df = spark.read.text( 's3://xxxxx-omitted-cloudwatch-logs/vpcFlowLogs/2024/01/31/*/*', wholetext = True) # Here we split each row (file) looking for "}{". # Still one row, but now many columns, each a valid JSON string df = df.select(F.split(F.regexp_replace( col('value'), '\}\{', '\}\|\{'), '\|').alias('value')) # Here we explode the columns into rows. df = df.withColumn("json_str", explode(initial_df["value"])) df = df.drop("value") # Here we convert each row to a JSON object json_schema = spark.read.json(df.rdd.map(lambda row: row.xyz)).schema df = df.withColumn('json', from_json(col('json_str'), json_schema)) # Now we explode the underlying logEvents array df = df.select(col('json.logEvents').alias('logEvents')) df = df.withColumn("logEvents_expr", explode(df["logEvents"])) # Now we split each log event message into named columns df = df.withColumn("msg", df2["logEvents_expr"].getItem("message")) spl = split(df3["msg"], " ") df = df.withColumn("account_id", spl.getItem(1)) #df = df.withColumn("interface_id", spl.getItem(2)) df = df.withColumn("srcaddr", spl.getItem(3)) df = df.withColumn("dstaddr", spl.getItem(4)) #df = df.withColumn("srcport", spl.getItem(5)) #df = df.withColumn("dstport", spl.getItem(6)) #df = df.withColumn("protocol", spl.getItem(7)) #df = df.withColumn("packets", spl.getItem(8)) df = df.withColumn("total_bytes", spl.getItem(9)) #df = df.withColumn("start", spl.getItem(10)) #df = df.withColumn("end", spl.getItem(11)) #df = df.withColumn("action", spl.getItem(12)) #df = df.withColumn("log_status", spl.getItem(13)) # Get rid of unneeded columns df = df.drop("logEvents") df = df.drop("logEvents_expr") # Here I filter by account. df = df.where(df["account_id"] == "99999999999") # And those events downloaded to NAT df = df.where((df["dstaddr"] == "10.123.124.125")) # And sum the total bytes df.select(F.sum(df.total_bytes)).show()
    
© www.soinside.com 2019 - 2024. All rights reserved.