{'key': 'val1'}
{'key': 'val2'}
有了这个:
spark.json.read('filename')
当 JSON 文档之间没有换行符时,如何将以下内容读入数据帧?
以下是输入示例。
{'key': 'val1'}{'key': 'val2'}
需要明确的是,我期望一个包含两行的数据框(
frame.count() == 2
)。
df = spark.read.json(["fileName1","fileName2"])
如果您想读取文件夹中的所有 json 文件,也可以这样做 -
df = spark.read.json("data/*json")
如果您确定 json 对象中没有内联右大括号,您可以执行以下操作:
with open('myfilename', 'r') as f:
txt = f.read()
txt = txt.replace('}', '}\n')
with open('mynewfilename', 'w') as f:
f.write(txt)
如果键或值中确实有“}”,则任务会变得更困难,但使用正则表达式并非不可能。不过这似乎不太可能。
以下函数将逐步尝试解析 json 并从文件中
yield
处理后续 json(来自这篇文章):
from functools import partial
from json import JSONDecoder
from io import StringIO
def generate_from_buffer(buffer: str, chunk: str, decoder: JSONDecoder):
buffer += chunk
while buffer:
try:
result, index = decoder.raw_decode(buffer)
yield result
buffer = buffer[index:].lstrip()
except ValueError:
# Not enough data to decode, read more
break
return buffer
def parse_jsons_file(jsons_str: str, buffer_size: int = 1024):
decoder = JSONDecoder()
buffer = ''
file_obj = StringIO(jsons_str)
for chunk in iter(partial(file_obj.read, buffer_size), ''):
buffer = yield from generate_from_buffer(buffer, chunk, decoder)
if buffer:
raise ValueError("Invalid input: should be concatenation of json strings")
我们首先用.format("text")
读取json:
df: DataFrame = (
spark
.read
.format("text")
.option("wholetext", True)
.load(data_source_path)
)
然后使用上面的函数将其转换为 RDD,flatMap
,最后将其转换回 Spark 数据帧。为此,您必须为文件中的单个 json 定义
json_schema
,无论如何,这是一个很好的做法。
rdd_df = (df_rdd.map(lambda row: row["value"])
.flatMap(lambda jsons_string: parse_jsons_file(jsons_string))
.toDF(json_schema))
https://stackoverflow.com/a/77198142
很久以前发的帖子,但我现在也遇到同样的情况。就我而言,我陷入了 VPC 流日志的困境,其中每组 logEntries 都是一个 JSON 对象,每个文件有许多没有分隔符。和你一样,我看到了一组 JSON 对象,例如 {...}{...}{...}...我同意,更改数据是最好的。但是,要在不重写的情况下使用现有数据,仍然需要一个解决方案。
这是我所做的(通过 AWS Glue 使用 pyspark):
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
import pyspark.sql.functions as F
from pyspark.sql.functions import explode, split, col, from_json
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Each file is a row
df = spark.read.text(
's3://xxxxx-omitted-cloudwatch-logs/vpcFlowLogs/2024/01/31/*/*',
wholetext = True)
# Here we split each row (file) looking for "}{".
# Still one row, but now many columns, each a valid JSON string
df = df.select(F.split(F.regexp_replace(
col('value'),
'\}\{',
'\}\|\{'),
'\|').alias('value'))
# Here we explode the columns into rows.
df = df.withColumn("json_str", explode(initial_df["value"]))
df = df.drop("value")
# Here we convert each row to a JSON object
json_schema = spark.read.json(df.rdd.map(lambda row: row.xyz)).schema
df = df.withColumn('json', from_json(col('json_str'), json_schema))
# Now we explode the underlying logEvents array
df = df.select(col('json.logEvents').alias('logEvents'))
df = df.withColumn("logEvents_expr", explode(df["logEvents"]))
# Now we split each log event message into named columns
df = df.withColumn("msg", df2["logEvents_expr"].getItem("message"))
spl = split(df3["msg"], " ")
df = df.withColumn("account_id", spl.getItem(1))
#df = df.withColumn("interface_id", spl.getItem(2))
df = df.withColumn("srcaddr", spl.getItem(3))
df = df.withColumn("dstaddr", spl.getItem(4))
#df = df.withColumn("srcport", spl.getItem(5))
#df = df.withColumn("dstport", spl.getItem(6))
#df = df.withColumn("protocol", spl.getItem(7))
#df = df.withColumn("packets", spl.getItem(8))
df = df.withColumn("total_bytes", spl.getItem(9))
#df = df.withColumn("start", spl.getItem(10))
#df = df.withColumn("end", spl.getItem(11))
#df = df.withColumn("action", spl.getItem(12))
#df = df.withColumn("log_status", spl.getItem(13))
# Get rid of unneeded columns
df = df.drop("logEvents")
df = df.drop("logEvents_expr")
# Here I filter by account.
df = df.where(df["account_id"] == "99999999999")
# And those events downloaded to NAT
df = df.where((df["dstaddr"] == "10.123.124.125"))
# And sum the total bytes
df.select(F.sum(df.total_bytes)).show()