用于压缩 parquet 表的 AWS Athena CTAS 会导致查询扫描所有数据

问题描述 投票:0回答:1

我在 S3 上有一个镶木地板数据湖,我希望从中查询。

为了优化性能,我的目标是使用以下脚本定期压缩我的文件:

import boto3
import datetime
import math
from awsglue.utils import getResolvedOptions
import sys

args = getResolvedOptions(
    sys.argv, ["bucket_output", "bucket_queries", "db_name", "table_prefix", "compacted_file_size_mb"]
)

bucket_output = args["bucket_output"]
bucket_queries = args["bucket_queries"]
db_name = args["db_name"]
table_prefix = args["table_prefix"]
compacted_file_size_mb = int(args["compacted_file_size_mb"])

table_suffix_temp = "_temporarytable"


# general function for running athena queries
def run_athena_query(query, database):
    client = boto3.client("athena")
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={"Database": database},
        ResultConfiguration={
            "OutputLocation": f"s3://{bucket_queries}/"  # Replace with your Athena query results location
        },
    )
    return response["QueryExecutionId"]


# general function for awaiting result of athena query
def wait_for_query_completion(query_execution_id):
    client = boto3.client("athena")
    while True:
        response = client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            break
    return status


# function for running athena CTAS query to compact parquet tables
# The CTAS creates a temporary compacted table in a new s3 location with suffix table_suffix_temp
def compact_table(device, table_excl_prefix, database, bucketed_by_column, bucket_count):
    query = f"""
    CREATE TABLE {database}.{table_prefix}{table_excl_prefix}{table_suffix_temp}
    WITH (
        format = 'PARQUET',
        parquet_compression = 'SNAPPY',
        bucketed_by = ARRAY['{bucketed_by_column}'], 
        bucket_count = {bucket_count},
        external_location = 's3://{bucket_output}/{device}/{table_excl_prefix}{table_suffix_temp}/'
    ) AS
    SELECT * FROM {database}.{table_prefix}{table_excl_prefix};
    """

    query_execution_id = run_athena_query(query, database)
    status = wait_for_query_completion(query_execution_id)
    if status != "SUCCEEDED":
        return status

    return "SUCCEEDED"


# move the CTAS compacted S3 objects into the original S3 table path
def move_s3_temp_table(bucket, device, table_excl_prefix):
    for obj in bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}{table_suffix_temp}/"):
        new_key = obj.key.replace(table_suffix_temp, "") + ".parquet"
        copy_source = {"Bucket": bucket_output, "Key": obj.key}
        bucket.copy(copy_source, new_key)
        obj.delete()

    print(
        f"Success: Moved all s3 objects from {device}/{table_excl_prefix}{table_suffix_temp} to {device}/{table_excl_prefix}"
    )


# delete s3 objects in the original table folder once CTAS query is done (if object is part of CTAS)
def delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time):
    table_path = bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}/")
    for obj in table_path:
        if obj.last_modified < ctas_start_time:
            obj.delete()
    print(f"Success: Deleted all s3 objects created before {ctas_start_time} in {device}/{table_excl_prefix}")


# get glue metadata for use in CTAS query
def get_table_meta(glue, database, device, table_excl_prefix):
    response = glue.get_table(DatabaseName=database, Name=f"{table_prefix}{table_excl_prefix}")

    # Extract the name of the second column
    second_column = response["Table"]["StorageDescriptor"]["Columns"][1]["Name"]
    size_in_bytes = int(response["Table"]["Parameters"]["sizeKey"])
    size_in_mb = size_in_bytes / 1000000

    return second_column, size_in_mb


def table_exists_in_glue(glue, database, table_excl_prefix):
    try:
        glue.get_table(DatabaseName=database, Name=table_excl_prefix)
        return True
    except glue.exceptions.EntityNotFoundException:
        return False


def main():
    # initialize s3 (boto3) and glue
    s3_client = boto3.client("s3")
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(bucket_output)
    glue = boto3.client("glue")

    # list device folders
    response = s3_client.list_objects_v2(Bucket=bucket_output, Delimiter="/")
    devices = [cp["Prefix"].rstrip("/") for cp in response["CommonPrefixes"]]
    print(f"\nThe s3 bucket {bucket_output} contains the following devices: ", devices)

    for device in devices:
        # list all tables in device folder
        table_response = s3_client.list_objects_v2(Bucket=bucket_output, Prefix=device + "/", Delimiter="/")
        tables_excl_prefix = [
            cp["Prefix"].split(device + "/")[-1].rstrip("/") for cp in table_response["CommonPrefixes"]
        ]

        for table_excl_prefix in tables_excl_prefix:
            print(f"\nNow processing table {table_prefix}{table_excl_prefix}")

            # skip tables that are not mapped by glue crawler
            if not table_exists_in_glue(glue, db_name, f"{table_prefix}{table_excl_prefix}"):
                print(
                    f"Warning: Table {table_prefix}{table_excl_prefix} does not exist in Glue Data Catalog. Skipping"
                )
                continue

            # extract meta data from the table for use in CTAS query
            second_column, size_in_mb = get_table_meta(glue, db_name, device, table_excl_prefix)
            bucketed_by_column = second_column
            
            bucket_count = math.ceil(size_in_mb / compacted_file_size_mb)
            print(f"Calculating bucket_count as math.ceil({size_in_mb} / {compacted_file_size_mb}) = {bucket_count}")

            # get current time, then run CTAS compaction query
            ctas_start_time = datetime.datetime.now(datetime.timezone.utc)
            status = compact_table(device, table_excl_prefix, db_name, bucketed_by_column, bucket_count)
            print("CTAS query status: ", status)

            if status == "SUCCEEDED":
                # copy CTAS output from temp S3 path to original table path, delete all pre query S3 objects and delete the temp CTAS table
                move_s3_temp_table(bucket, device, table_excl_prefix)
                delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time)
                glue.delete_table(DatabaseName=db_name, Name=f"{table_prefix}{table_excl_prefix}{table_suffix_temp}")
            else:
                # if CTAS query fails, no further action is taken for this table
                print(f"Error compacting {device}/{table_excl_prefix}")


main()

这按预期工作并压缩镶木地板文件,尽管它也更改了格式和一些其他属性与我的原始镶木地板文件相比。

但是,我观察到了意想不到的行为。

当我对原始数据运行以下查询时,我仅扫描了总计约 250 MB 中的 12.9 MB。这是因为我只提取一列 5 分钟的数据。

但是,如果我使用上面的脚本压缩数据后运行相同的查询,则扫描的 MB 为 162.8 MB,就好像所有数据都被扫描以产生相同的结果一样。这是为什么?

SELECT
    AVG(a1) as AVG_a1                 
FROM
    tbl_11111111_a_allfiles
WHERE
t BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'

请注意,我的数据在这两种情况下(压缩之前/之后)都是未分区的。另请注意,在压缩之前,我有 5 x 54.3 MB 的“原始”镶木地板结构文件,在压缩之后,我有 14 x 15.8 MB 文件。两次查询运行之间的实际查询结果没有变化。我还验证了重新运行我的 AWS Glue 爬网程序后该行为仍然存在。

另请注意,我根据 CTAS 查询中的信号列之一(例如可能是车辆速度)进行存储 - 而我的过滤使用时间戳。我想这里的问题可能是我“失去”了修剪行组的能力,因为我的压缩数据不再按时间戳列进行最佳分组/排序。但是有没有办法将数据分块到压缩文件中,同时将时间戳列保留为我的主要过滤器?

amazon-web-services aws-glue amazon-athena
1个回答
0
投票
正如您提到的那样,分桶将根据您的

bucketed by

 列来组织文件。如果这类似于 
vehicle speed
,那么您生成的许多文件可能满足您的时间戳过滤器 
BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'
。这会导致扫描更多 MB。

我假设您有充分的理由通过

vehicle speed

 进行分桶,并且您希望继续使用此列进行分桶。然后,一种方法是根据日期进行分区。

否则,

不是分桶,正如您所观察到的那样,已经改进了 MB 扫描。由于 parquet 是面向列的,因此它针对列过滤器进行了优化,即使所有内容都分组在 1 个文件中也是如此。

© www.soinside.com 2019 - 2024. All rights reserved.