我在应用程序中使用 PySpark 结构化流,其中使用
readStream
从 Apache Iceberg 表中读取附加数据。在 PySpark 框架中处理数据后,我想使用 Python 中的 websockets 库将处理后的数据发送到 WebSocket 客户端。
我尝试使用
.foreach()
,但我无法在其中使用await
我找到了解决方案:
async def process(df, df_id, websocket):
# data processing
await websocket.send(data)
def process_wrapper(batch_df, batch_id, websocket):
asyncio.run(process(batch_df, batch_id, instance))
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(partial(process_wrapper, websocket=websocket)) \
.trigger(processingTime="10 seconds") \
.start()