我目前面临一个问题,EventHub 文件以 avro 格式登陆到登陆区域。之后,自动加载程序拾取文件并将它们按原样放入表中,这意味着正文以字节为单位。流中添加了另一列,该流正在将 Body 解码为字符串。表中的列现在包含带有字符串的行,需要解析所有行并将其转换为 json。字符串看起来像这样
@cc300://imm/cm#//c.Process/p.TriggerAck/v,@2222,@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v,@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v
444444,4.73,0
预期产量
{
"@cc300://imm/cm#//c.Process/p.TriggerAck/v,@2222,@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v": 444444,
"@2222": 4.73,
"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v": 0
}
每个列名分别属于一个值。我想将所有这些字符串转换为 json 并将它们存储在表中的行中。
我对 pyspark 和 Databricks 相当陌生,所以我想知道我是否可以获得任何类型的帮助或推动正确的方向,以了解如何将所有这些转换为 json 并使其达到最佳状态,因为这是对数据。
非常感谢任何帮助!
我目前拥有的代码没有多大帮助
import pandas as pd
from io import StringIO
data = spark.sql("SELECT decoded FROM catalog.schema.table")
first_row = data.head()
decoded_body = first_row[0]
df = pd.read_csv(StringIO(decoded_body))
即使我不只选择一行,这也只能处理第一行。我的想法是将其转换为正确的表,然后转换为 json,但我确信有更好的解决方案。
您可以使用Databricks中的用户定义函数来实现您的目标。有一个函数(from_csv 函数 - Azure Databricks - Databricks SQL | Microsoft Learn)可以从 CSV 字符串读取数据。由于您有动态架构,因此可以使用下面的代码。
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import *
from io import StringIO
import pandas as pd
def get_json(row):
import json
df = pd.read_csv(StringIO(row), dtype=float)
res = {'value': json.loads(df.to_json(orient='records'))[0]}
return res
json_udf = udf(get_json, StructType([
StructField('value', MapType(StringType(), FloatType()))
]))
还有
df = spark.sql("SELECT decoded FROM table")
df.withColumn("json", json_udf(F.col('decoded'))).select("json.value").display()
输出:
价值 |
---|
{"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v":null,"@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v": 0,"@2222":4.73,"@cc300://imm/cm#//c.Process/p.TriggerAck/v":444444} |
这里,对于每条记录,它都会以 pandas 的形式读取 CSV 数据,然后将其转换为 JSON。您可以根据您的数据调整数据类型。