我正在使用 Glue 和 pyspark。我正在向这样的两个表写入:
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df1, glueContext, "df1"),
connection_type="custom.spark",
connection_options=combinedConf
)
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df2, glueContext, "df2"),
connection_type="custom.spark",
connection_options=combinedConf
)
我想将它们合并到一个事务中,这样要么对两个表的写入都将被写入,要么两者都不会被写入。我想过
Transactions
,但是如何将transaction id
传递给glueContext.write_dynamic_frame.from_options
函数,我查了文档但没有找到。
知道如何强制这两行一起执行或根本不执行吗?谢谢
编辑: 我是这样添加的,但是得到了
TypeError: from_options() got an unexpected keyword argument 'transactionId'
tx_id = context.start_transaction(read_only=False) context.write_dynamic_frame.from_options( 框架=DynamicFrame.fromDF(df1,gluecontext,“df1”), 连接类型=“自定义.spark”, 连接选项=配置1, 交易ID=tx_id )
context.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df2, gluecontext, "df2"),
connection_type="custom.spark",
connection_options=config2,
transactionId=tx_id
)
try:
context.commit_transaction(tx_id)
print("done writing after transaction")
except Exception:
context.cancel_transaction(tx_id)
print("cancel trans")
raise Exception("the transaction cancelled")
在additional_options中添加transactionId