我在S3中有一个hudi表,它在Glue目录中注册。我希望编写一个 Glue pyspark 作业来删除在其中一个字段中具有特定值的所有记录。
我设法找到的代码示例都是从将表加载到 DataFrame 开始的,然后应用过滤并最终写回表。
这里有一个示例代码来描述所需的结果:
import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext
# Create a GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Specify the Hudi table path on Amazon S3
hudi_table_path = "s3://your-s3-bucket/hudi-table-path"
# Value to filter records on
value_to_delete = "your_value"
# Create a GlueContext and get the dynamic frame for the Hudi table
hudi_dyf = glueContext.create_dynamic_frame.from_catalog(database="your-database-name", table_name="your-table-name")
# Filter the records based on the field with the specified value
filtered_dyf = Filter.apply(frame=hudi_dyf, f=lambda x: x["your_field_name"] != value_to_delete)
# Delete the filtered records from the Hudi table
glueContext.write_dynamic_frame.from_catalog(frame=filtered_dyf, database="your-database-name", table_name="your-table-name", transformation_ctx="datasink")
# Commit the deletion operation
glueContext.commit()
# Cleanup
glueContext.cleanup()
我想知道是否有一种方法可以在不将整个表加载到内存中的情况下完成此操作 - 类似于 SQL 中的删除命令?我担心的是具有大量记录的表的性能和成本。
下面是另一种执行硬删除的方法,在 Spark SQL 的帮助下,您不需要在完整数据上创建 DataFrame,然后执行过滤器转换。这是我在工作中使用的示例代码,用于在 emp_id =1 的测试表上执行硬删除
hard_delete_df = spark.sql("SELECT * FROM mydb.emptable where emp_id='1' ")
print(hard_delete_df.show())
print("\n")
hudi_options['hoodie.datasource.write.operation'] = 'delete'
hard_delete_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)
也可以直接使用SparkSQL直接删除,请参考官方文档
DELETE FROM hudi_table WHERE uuid = '3f3d9565-7261-40e6-9b39-b8aa784f95e2';
希望这有帮助!