Python/PySpark - 以编程方式将 json_string 列发送到 REST API

问题描述 投票:0回答:1

我有一个数据帧,我使用 Spark Structured Streaming .readStream() 进行流式传输:

身份证 json_数据
123 {颜色:“红色”,值:“#f00”}
125 {颜色:“蓝色”,值:“#f45”}

我想将每行中的每个 json_data 作为 json 有效负载发送到 Rest API。最好的方法是什么?

我知道 Databricks 有一个数据框编写器(https://docs.databricks.com/en/structed-streaming/foreach.html),但不清楚我会如何做到这一点。

我需要将列写入Python字典吗?

有点困惑这个脚本如何工作,流数据进入并附加到数据帧上,但我需要 json_data 列(存储为字符串)作为有效负载。

pyspark databricks spark-structured-streaming
1个回答
0
投票

试试这个:

def export_to_api(microBatchOutputDF, batchId):
  microBatchOutputDF_array = microBatchOutputDF.collect()
  for row in microBatchOutputDF_array:
    json_content = row.json_data
    # Enter solution for exporting to api
    <>

# Write the output of a streaming aggregation query into Delta table
(streaming_data.writeStream
  .format("delta")
  .foreachBatch(export_to_api)
)
© www.soinside.com 2019 - 2024. All rights reserved.