EventHub 解析解码体

问题描述 投票:0回答:1

我目前面临一个问题,EventHub 文件以 avro 格式登陆到登陆区域。之后,自动加载程序拾取文件并将它们按原样放入表中,这意味着正文以字节为单位。流中添加了另一列,该流正在将 Body 解码为字符串。表中的列现在包含带有字符串的行,需要解析所有行并将其转换为 json。字符串看起来像这样

@cc300://imm/cm#//c.Process/p.TriggerAck/v,@2222,@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v,@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v
444444,4.73,0

预期产量

{
"@cc300://imm/cm#//c.Process/p.TriggerAck/v,@2222,@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v": 444444,
"@2222": 4.73,
"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v": 0
}

每个列名分别属于一个值。我想将所有这些字符串转换为 json 并将它们存储在表中的行中。

我对 pyspark 和 Databricks 相当陌生,所以我想知道我是否可以获得任何类型的帮助或推动正确的方向,以了解如何将所有这些转换为 json 并使其达到最佳状态,因为这是对数据。

非常感谢任何帮助!

我目前拥有的代码没有多大帮助

import pandas as pd
from io import StringIO

data = spark.sql("SELECT decoded FROM catalog.schema.table")

first_row = data.head()

decoded_body = first_row[0]

df = pd.read_csv(StringIO(decoded_body))

即使我不只选择一行,这也只能处理第一行。我的想法是将其转换为正确的表,然后转换为 json,但我确信有更好的解决方案。

python apache-spark pyspark azure-databricks azure-eventhub
1个回答
0
投票

您可以使用Databricks中的用户定义函数来实现您的目标。有一个函数(from_csv 函数 - Azure Databricks - Databricks SQL | Microsoft Learn)可以从 CSV 字符串读取数据。由于您有动态架构,因此可以使用下面的代码。

from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import *
from io import StringIO
import pandas as pd

def get_json(row):
    import json
    df = pd.read_csv(StringIO(row), dtype=float)
    res = {'value': json.loads(df.to_json(orient='records'))[0]}
    return res

json_udf = udf(get_json, StructType([
    StructField('value', MapType(StringType(), FloatType()))
]))

还有

df = spark.sql("SELECT decoded FROM table")
df.withColumn("json", json_udf(F.col('decoded'))).select("json.value").display()

输出:

价值
{"@cc300://imm/cm#//c.AirValve1/p.sv_bActivatedInSequence/v":null,"@cc300://imm/cm#//c.AcousticAlarm1/p.sv_bAlarm/v": 0,"@2222":4.73,"@cc300://imm/cm#//c.Process/p.TriggerAck/v":444444}

这里,对于每条记录,它都会以 pandas 的形式读取 CSV 数据,然后将其转换为 JSON。您可以根据您的数据调整数据类型。

© www.soinside.com 2019 - 2024. All rights reserved.