我在 S3 上有一个镶木地板数据湖,我希望从中查询。
为了优化性能,我的目标是使用以下脚本定期压缩我的文件:
import boto3
import datetime
import math
from awsglue.utils import getResolvedOptions
import sys
args = getResolvedOptions(
sys.argv, ["bucket_output", "bucket_queries", "db_name", "table_prefix", "compacted_file_size_mb"]
)
bucket_output = args["bucket_output"]
bucket_queries = args["bucket_queries"]
db_name = args["db_name"]
table_prefix = args["table_prefix"]
compacted_file_size_mb = int(args["compacted_file_size_mb"])
table_suffix_temp = "_temporarytable"
# general function for running athena queries
def run_athena_query(query, database):
client = boto3.client("athena")
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={
"OutputLocation": f"s3://{bucket_queries}/" # Replace with your Athena query results location
},
)
return response["QueryExecutionId"]
# general function for awaiting result of athena query
def wait_for_query_completion(query_execution_id):
client = boto3.client("athena")
while True:
response = client.get_query_execution(QueryExecutionId=query_execution_id)
status = response["QueryExecution"]["Status"]["State"]
if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
break
return status
# function for running athena CTAS query to compact parquet tables
# The CTAS creates a temporary compacted table in a new s3 location with suffix table_suffix_temp
def compact_table(device, table_excl_prefix, database, bucketed_by_column, bucket_count):
query = f"""
CREATE TABLE {database}.{table_prefix}{table_excl_prefix}{table_suffix_temp}
WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
bucketed_by = ARRAY['{bucketed_by_column}'],
bucket_count = {bucket_count},
external_location = 's3://{bucket_output}/{device}/{table_excl_prefix}{table_suffix_temp}/'
) AS
SELECT * FROM {database}.{table_prefix}{table_excl_prefix};
"""
query_execution_id = run_athena_query(query, database)
status = wait_for_query_completion(query_execution_id)
if status != "SUCCEEDED":
return status
return "SUCCEEDED"
# move the CTAS compacted S3 objects into the original S3 table path
def move_s3_temp_table(bucket, device, table_excl_prefix):
for obj in bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}{table_suffix_temp}/"):
new_key = obj.key.replace(table_suffix_temp, "") + ".parquet"
copy_source = {"Bucket": bucket_output, "Key": obj.key}
bucket.copy(copy_source, new_key)
obj.delete()
print(
f"Success: Moved all s3 objects from {device}/{table_excl_prefix}{table_suffix_temp} to {device}/{table_excl_prefix}"
)
# delete s3 objects in the original table folder once CTAS query is done (if object is part of CTAS)
def delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time):
table_path = bucket.objects.filter(Prefix=f"{device}/{table_excl_prefix}/")
for obj in table_path:
if obj.last_modified < ctas_start_time:
obj.delete()
print(f"Success: Deleted all s3 objects created before {ctas_start_time} in {device}/{table_excl_prefix}")
# get glue metadata for use in CTAS query
def get_table_meta(glue, database, device, table_excl_prefix):
response = glue.get_table(DatabaseName=database, Name=f"{table_prefix}{table_excl_prefix}")
# Extract the name of the second column
second_column = response["Table"]["StorageDescriptor"]["Columns"][1]["Name"]
size_in_bytes = int(response["Table"]["Parameters"]["sizeKey"])
size_in_mb = size_in_bytes / 1000000
return second_column, size_in_mb
def table_exists_in_glue(glue, database, table_excl_prefix):
try:
glue.get_table(DatabaseName=database, Name=table_excl_prefix)
return True
except glue.exceptions.EntityNotFoundException:
return False
def main():
# initialize s3 (boto3) and glue
s3_client = boto3.client("s3")
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_output)
glue = boto3.client("glue")
# list device folders
response = s3_client.list_objects_v2(Bucket=bucket_output, Delimiter="/")
devices = [cp["Prefix"].rstrip("/") for cp in response["CommonPrefixes"]]
print(f"\nThe s3 bucket {bucket_output} contains the following devices: ", devices)
for device in devices:
# list all tables in device folder
table_response = s3_client.list_objects_v2(Bucket=bucket_output, Prefix=device + "/", Delimiter="/")
tables_excl_prefix = [
cp["Prefix"].split(device + "/")[-1].rstrip("/") for cp in table_response["CommonPrefixes"]
]
for table_excl_prefix in tables_excl_prefix:
print(f"\nNow processing table {table_prefix}{table_excl_prefix}")
# skip tables that are not mapped by glue crawler
if not table_exists_in_glue(glue, db_name, f"{table_prefix}{table_excl_prefix}"):
print(
f"Warning: Table {table_prefix}{table_excl_prefix} does not exist in Glue Data Catalog. Skipping"
)
continue
# extract meta data from the table for use in CTAS query
second_column, size_in_mb = get_table_meta(glue, db_name, device, table_excl_prefix)
bucketed_by_column = second_column
bucket_count = math.ceil(size_in_mb / compacted_file_size_mb)
print(f"Calculating bucket_count as math.ceil({size_in_mb} / {compacted_file_size_mb}) = {bucket_count}")
# get current time, then run CTAS compaction query
ctas_start_time = datetime.datetime.now(datetime.timezone.utc)
status = compact_table(device, table_excl_prefix, db_name, bucketed_by_column, bucket_count)
print("CTAS query status: ", status)
if status == "SUCCEEDED":
# copy CTAS output from temp S3 path to original table path, delete all pre query S3 objects and delete the temp CTAS table
move_s3_temp_table(bucket, device, table_excl_prefix)
delete_s3_table(bucket, device, table_excl_prefix, ctas_start_time)
glue.delete_table(DatabaseName=db_name, Name=f"{table_prefix}{table_excl_prefix}{table_suffix_temp}")
else:
# if CTAS query fails, no further action is taken for this table
print(f"Error compacting {device}/{table_excl_prefix}")
main()
这按预期工作并压缩镶木地板文件,尽管它也更改了格式和一些其他属性与我的原始镶木地板文件相比。
但是,我观察到了意想不到的行为。
当我对原始数据运行以下查询时,我仅扫描了总计约 250 MB 中的 12.9 MB。这是因为我只提取一列 5 分钟的数据。
但是,如果我使用上面的脚本压缩数据后运行相同的查询,则扫描的 MB 为 162.8 MB,就好像所有数据都被扫描以产生相同的结果一样。这是为什么?
SELECT
AVG(a1) as AVG_a1
FROM
tbl_11111111_a_allfiles
WHERE
t BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'
请注意,我的数据在这两种情况下(压缩之前/之后)都是未分区的。另请注意,在压缩之前,我有 5 x 54.3 MB 的“原始”镶木地板结构文件,在压缩之后,我有 14 x 15.8 MB 文件。两次查询运行之间的实际查询结果没有变化。我还验证了重新运行我的 AWS Glue 爬网程序后该行为仍然存在。
另请注意,我根据 CTAS 查询中的信号列之一(例如可能是车辆速度)进行存储 - 而我的过滤使用时间戳。我想这里的问题可能是我“失去”了修剪行组的能力,因为我的压缩数据不再按时间戳列进行最佳分组/排序。但是有没有办法将数据分块到压缩文件中,同时将时间戳列保留为我的主要过滤器?
bucketed by
列来组织文件。如果这类似于
vehicle speed
,那么您生成的许多文件可能满足您的时间戳过滤器
BETWEEN TIMESTAMP '2000-01-09 17:58:17' AND TIMESTAMP '2000-01-09 18:02:24'
。这会导致扫描更多 MB。我假设您有充分的理由通过
vehicle speed
进行分桶,并且您希望继续使用此列进行分桶。然后,一种方法是根据日期进行分区。否则,
不是分桶,正如您所观察到的那样,已经改进了 MB 扫描。由于 parquet 是面向列的,因此它针对列过滤器进行了优化,即使所有内容都分组在 1 个文件中也是如此。