合并(插入/更新)

问题描述 投票:0回答:1

我需要在 PySpark 上创建逻辑来更新包含数百万条记录的表。 这些表从 BigQuery 获取数据。 当插入新流时,它应该将现有记录与新记录进行比较:比较必须通过键完成。 如果键与新流的键匹配,则使用所有字段更新整个记录。 否则,它必须根据相应的键插入新记录。 所有这些都是为了避免重复。 你有什么建议吗?

此刻我想到创建一个临时表并进行合并。但我不喜欢这个想法,因为它会创建太多数据。

pyspark google-bigquery merge insert updates
1个回答
0
投票

我更喜欢使用

foreachBatch
,它不仅适用于 Sparkstreaming,也适用于静态数据帧

给出执行此操作的示例方法

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("read from bigquery and upsert") \
    .getOrCreate()
mydf = spark.read.format("bigquery") \
    .option("table", "yourtable") \
    .load()

然后进行微批量逻辑 使用 foreachBatch 的 UPSERT 逻辑 过滤现有记录 使用内连接识别需要更新的现有记录 使用 left_anti join 识别需要插入的新记录 更新现有记录 插入新记录

 def do_micro_batch(df, batch_id):
    # keys for comparison
    keys = ['key1', 'key2']

     
    existing_records = bigquery_df.select(*keys)

   
    updates = df.join(existing_records, on=keys, how='inner')

    new_records = df.join(existing_records, on=keys, how='left_anti')
 
    updates.write.format("bigquery") \
        .option("table", "yourtable") \
        .option("writeDisposition", "WRITE_APPEND") \
        .save()

    new_records.write.format("bigquery") \
        .option("table", "yourtable") \
        .option("writeDisposition", "WRITE_APPEND") \
        .save()



使用 foreachBatch 进行处理

mydf.writeStream \
    .foreachBatch(do_micro_batch) \
    .start() \
    .awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.