我需要在 PySpark 上创建逻辑来更新包含数百万条记录的表。 这些表从 BigQuery 获取数据。 当插入新流时,它应该将现有记录与新记录进行比较:比较必须通过键完成。 如果键与新流的键匹配,则使用所有字段更新整个记录。 否则,它必须根据相应的键插入新记录。 所有这些都是为了避免重复。 你有什么建议吗?
此刻我想到创建一个临时表并进行合并。但我不喜欢这个想法,因为它会创建太多数据。
我更喜欢使用
foreachBatch
,它不仅适用于 Sparkstreaming,也适用于静态数据帧
给出执行此操作的示例方法
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("read from bigquery and upsert") \
.getOrCreate()
mydf = spark.read.format("bigquery") \
.option("table", "yourtable") \
.load()
然后进行微批量逻辑 使用 foreachBatch 的 UPSERT 逻辑 过滤现有记录 使用内连接识别需要更新的现有记录 使用 left_anti join 识别需要插入的新记录 更新现有记录 插入新记录
def do_micro_batch(df, batch_id):
# keys for comparison
keys = ['key1', 'key2']
existing_records = bigquery_df.select(*keys)
updates = df.join(existing_records, on=keys, how='inner')
new_records = df.join(existing_records, on=keys, how='left_anti')
updates.write.format("bigquery") \
.option("table", "yourtable") \
.option("writeDisposition", "WRITE_APPEND") \
.save()
new_records.write.format("bigquery") \
.option("table", "yourtable") \
.option("writeDisposition", "WRITE_APPEND") \
.save()
使用 foreachBatch 进行处理
mydf.writeStream \
.foreachBatch(do_micro_batch) \
.start() \
.awaitTermination()