我在Databricks上构建代码以实时读取增量表(读取流),然后需要将此流数据发布到API。在我阅读的所有论文中,writestream仅用于创建文件(.csv,.avro,.parquet等)或发送到事件中心。可以使用writestream发布到API!?
我的代码:
from pyspark.sql.functions import unix_timestamp, round, col
import json
import pandas as pd
from pyspark.sql.functions import lit
import requests
#tried with foreach_batch but it doens't work
def foreach_batch_function(df,epochId):
r2 = requests.post('https://demo.api.com/index.php/api/v5/smsrequest/', data=str(df), verify=False)
r2.json()
pass
rs = spark.readStream.format("delta").option('path','/mnt/gen2/raw/mytable').load()
df = rs.select(round('id_cliente_fat').alias('id_cliente_fat'),'fone_fat','nome_fat',unix_timestamp('dt_nasc_fat','YYYY-MM-DD').cast('timestamp').cast('date').alias('birth_date'),'email_fat')
df2 = df.selectExpr('id_cliente_fat as identifier_code','fone_fat as phone_number','nome_fat as name','birth_date','email_fat as email')
data = {'authentication':{'username':'user','password':'pass'}}
r = requests.post('https://demo.api.com/index.php/api/v5/login/', data=json.dumps(data), verify=False).json()
df3 = df2.withColumn("steps", lit("[1,2,4,7]")).withColumn("place_id", lit(164)).withColumn("token", lit(r["authentication"]["token"]))
df4 = df3.select(to_json(struct(struct("token").alias("authentication"), struct("identifier_code", "phone_number", "name", "birth_date", "email","steps","place_id").alias("smsrequest").alias("smsrequest"))).alias(""))
df4.writeStream.foreachBatch(foreach_batch_function).start()
您需要使用.collect()
方法将数据传输到驱动程序(不建议使用大量数据)。>>
尝试这样的事情:
def foreach_batch_function(df,epochId):
# Create a Json with kews the name of the columns and values the values of the df
json_data = map(lambda row: row.asDict(), df.collect())
r2 = requests.post('https://demo.api.com/index.php/api/v5/smsrequest/', data=json_data, verify=False)
r2.json()
pass