我有一个库存桶 - 桶里面 - 我有6个文件夹。
在雅典娜每6个文件夹 - 我在雅典娜有6张桌子。现在我必须更新分区 - 当文件被放入6个文件夹中的任何一个时。如何在一个lambda中为s3事件触发器编写多个sql(6 Sql)。
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
数据库是一样的;但是我有6张不同的桌子。我必须更新所有6个表。
首先,我将检查已删除文件的密钥,并仅更新指向文件被删除的前缀的表。例如。如果您的文件夹和表格是prefix0
,prefix1
,prefix2
等,并且删除的文件具有密钥prefix1/some-file
,则只更新位于prefix1
的表格。没有必要更新其他表,他们的数据没有改变。
但是,我建议不要使用MSCK REPAIR TABLE
。这个命令几乎在所有可能的方面都很糟糕。它的效率非常低,当你向表的前缀添加更多对象时,它的性能会变得越来越差。它看起来并不等待它在你的Lambda中完成,所以至少你没有为它的低效率付出代价,但是有更好的方法来添加分区。
您可以直接使用Glue API(在胶水目录下,Athena表是Glue目录中的表),但实际上有点复杂,因为您需要指定大量元数据(Glue API的缺点)。
我建议你做MSCK REPAIR TABLE …
而不是ALTER TABLE ADD PARTITION …
电话:
改变线
sql = 'MSCK REPAIR TABLE some_database.some_table'
至
sql = 'ALTER TABLE some_database.some_table ADD IF NOT EXISTS PARTITION (…) LOCATION \'s3://…\''
它说…
的部分你必须从对象的密钥中提取。如果您的密钥看起来像s3://some-bucket/pk0=foo/pk1=bar/object.gz
,并且您的表具有分区密钥pk0
和pk1
,则SQL将如下所示:
ALTER TABLE some_database.some_table
ADD IF NOT EXISTS
PARTITION (pk0 = 'foo', pk1 = 'bar') LOCATION 's3://some-bucket/pk0=foo/pk1=bar/'