如何使用/不使用 Glue Crawler 自动执行 ALTER TABLE 添加分区

问题描述 投票:0回答:3

我注意到 ADD PARTITIONS 更有效,因为我在 s3 中处理 JSON 数据。我设置的爬虫需要很长时间才能完成。当我要求它仅“仅抓取新文件夹”时,它并没有真正添加新分区。我还取消选中“使用表中的元数据更新所有新的和现有的分区”选项。

我本质上需要的是在添加新分区时自动添加分区(应该在几秒钟内发生)。我无法从将数据推送到 s3 的代码库中执行此操作,因为他们是仅负责 s3 数据填充的不同团队。我不希望自动化作业遍历每条记录/元数据并修复记录(现在大约需要 40 分钟)。

感谢任何帮助,因为我是 Glue 和 Athena 的新手。

amazon-web-services aws-glue amazon-athena aws-glue-data-catalog
3个回答
0
投票

MSCK REPAIR TABLE table_name
是将新分区更新到现有表的最简单方法。您可以从各种 SDK(例如用于 python 的 boto3)发送此查询:

import boto3

client = boto3.client('athena')
client.start_query_execution(QueryString='MSCK REPAIR TABLE table_name')

当向 S3 存储桶添加新文件或使用事件总线计划事件时,您可以使用触发器在 Lambda 中触发此代码。


0
投票

使用 for 循环,您可以遍历表的分区以提取要使用的分区总数。例如,您可以将以下代码用于 lambda

  import time
  import boto3

  DATABASE = "name_database"
  TABLE = "name_table"
  query_base = "ALTER TABLE "+name_database+"."+name_table+" ADD IF NOT EXISTS PARTITION (dt = '2016-05-14', country = 'IN')"
  
  output='s3://aws-athena-query-results-example'

  def lambda_handler(event, context):
      query = query_base
      client = boto3.client('athena')

# Execution
  response = client.start_query_execution(
      QueryString=query,
      QueryExecutionContext={
        'Database': DATABASE
      },
      ResultConfiguration={
        'OutputLocation': output,
      }
  )
  return response
  return

0
投票

由于不同的团队负责将文件写入 S3 文件夹,事件驱动架构将是在几秒钟内自动添加分区的最佳解决方案。添加 S3 文件时,会触发一个调用 lambda 的事件。 Lambda 将接收 S3 元数据作为输入,并使用元数据获取文件夹名称并执行 Athena ADD PARTITION 或 Glue CREATE PARTITION。使用事件驱动,仅对新文件夹调用 ADD/CREATE 分区。

如果不同团队将 1000 多个文件写入 S3 文件夹,那么将调用 1000 多个 lambda,这不是一个好的解决方案。要解决这个问题,有两种选择。

选项 1:与向 S3 写入文件的团队核实,他们是否可以在完成数据文件写入后删除具有不同扩展名(例如 *.trg)的 0 字节文件。 S3 可以配置为仅针对 .trg 文件调用 lambda。

选项 2:S3 可以配置为将 s3 事件推送到 SQS 队列。然后每 5 或 10 分钟有一个 lambda 计划,批量读取 SQS 队列消息并从这些消息中获取不同文件夹的列表,然后执行 ADD/创建分区。

MSCK REPAIR TABLE 是一项昂贵的操作,因为它递归地列出 S3 中的前缀和对象,如果 S3 前缀或对象较多,则需要很长时间。

© www.soinside.com 2019 - 2024. All rights reserved.