我有超过1000万的海量数据,这些数据可以通过数据帧“分区键”中的键进行分区
我想发布这个分区键组中的数据,有什么办法吗?
sample data
from pyspark.sql.functions import *
#sample data
df=sqlContext.createDataFrame([
('1234567','123 Main St','10SjtT','[email protected]','ecom','direct','1')
,('877898','234 Main St','efttrt','[email protected]','fbgfbgf','indirect','2')
,('234242','44 fdgfdg St','er','[email protected]','bbbb','indirect','3')
,('543','444 bbtt St','2342reyhr2r','[email protected]','jjjj','indirect','1')
,('7887','7777 ddsss St','ygf','[email protected]','uuuu','indirect','2')
,('122113','333 cdc St','edc','[email protected]','yyyy','indirect','2')]
,['cust_id','address','store_id','email','sales_channel','category',"partitionId"])
dff = df.select("partitionId","cust_id","address",to_json(struct("store_id","category","sales_channel","email",struct( "category" ,"email").alias("c_email"))).alias("body"))
dff.select("body").show(10,False)
在此数据框中,分区键是“partitionId”,它将数据批处理为 50 万条记录,然后我可以将这 50 万条数据记录推送到事件中心。
pyspark foreach 没有给我更多的控制权来批处理和处理 eventhub 发布的数据。
如有任何建议请告诉我。
谢谢, 马诺杰。
您可以尝试以下代码,通过基于
partitionId
的分区将数据写入azure event hub。
您还需要使用
partition count
20 创建事件中心,因为您的数据大约有 20 个分区。
dff = df.select("partitionId","cust_id","address",to_json(struct("store_id","category","sales_channel","email",struct( "category" ,"email").alias("c_email"))).alias("body"))
fdf = dff.select("partitionId","body")
connectionString = "Endpoint=sb://<your_event_hub>.servicebus.windows.net/;SharedAccessKeyName=<access_key_name>;SharedAccessKey=<key>;EntityPath=<event_hub_name>"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
ds = fdf\
.write.partitionBy("partitionId") \
.format("eventhubs") \
.options(**ehConf) \
.save()
在运行此代码之前,请安装此 Maven 包。
com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
输出:
我在 azure 流分析预览中检查了结果。