使用PySpark结构化流,如何通过WebSocket将处理后的数据发送到客户端

问题描述 投票:0回答:1

我在应用程序中使用 PySpark 结构化流,其中使用

readStream
从 Apache Iceberg 表中读取附加数据。在 PySpark 框架中处理数据后,我想使用 Python 中的 websockets 库将处理后的数据发送到 WebSocket 客户端。

我尝试使用

.foreach()
,但我无法在其中使用await

python apache-spark pyspark spark-structured-streaming
1个回答
0
投票

我找到了解决方案:

  1. 定义异步处理函数:
async def process(df, df_id, websocket):
    # data processing
    await websocket.send(data)
  1. 创建一个包装函数来处理每批流数据:
def process_wrapper(batch_df, batch_id, websocket):
    asyncio.run(process(batch_df, batch_id, instance))
  1. 设置流式查询:
query = df \
       .writeStream \
       .outputMode("append") \
       .foreachBatch(partial(process_wrapper, websocket=websocket)) \
       .trigger(processingTime="10 seconds") \
       .start()
© www.soinside.com 2019 - 2024. All rights reserved.