如何使用 Py-spark 从 Fabric 中的 Azure 事件中心读取压缩数据

问题描述 投票:0回答:1

我将压缩数据发送到事件中心,以克服 Azure 事件中心中的 1 MB 硬限制。 我还必须在 Py-spark 中阅读此内容并更新增量表。

发送到事件中心的压缩数据在 Py-spark 流中为空。怎么读?

这就是我从事件中心阅读的方式

 df_stream_body = df_stream.select(F.from_json(F.col("body").cast("string"), message_schema).alias("Payload"))

这就是我将数据发送到事件中心的方式。 event_data_batch =等待生产者.create_batch()

    # Add events to the batch.
    body = '{"id": "90", "firstName": "Sudarshan70","middleName": "Kumar2","lastName": "Thakur2"}'  
    
    # Compress the JSON string using the gzip algorithm.
    compressed_body = gzip.compress(body.encode('utf-8'))

    # Encode the compressed JSON string to base64.
    encoded_compressed_body = base64.b64encode(compressed_body)
    

    event_data_batch.add(EventData(encoded_compressed_body))

我尝试使用 gzip 选项进行阅读,但它给了我 null 。

df_stream  = spark.readStream.format("eventhubs")\
  .options(**ehConf)\
  .option("compression", "gzip") \
  .load() 
pyspark azure-eventhub fabric
1个回答
0
投票

您需要对列

body
进行解压缩,选项
.option("compression", "gzip")
适用于压缩文件,而不是列上的压缩数据。

因此,需要创建用户定义的函数来解压缩和解码数据。 使用下面的代码。

UDF

import  pyspark.sql.functions as F
from pyspark.sql.types import StringType
import gzip,base64,json


def unZip(binary_string):
    return gzip.decompress(base64.b64decode(binary_string)).decode('utf-8')

unzip = F.udf(unZip, StringType())

接下来,使用解压缩的数据创建新列。

df.withColumn("bd", unzip(F.col("body").cast('string'))).display()

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.