我的 Athena 查询的结果似乎太短。试图找出为什么?
设置:
胶水目录(118.6 Gig 大小)。 数据:以 CSV 和 JSON 格式存储在 S3 中。 Athena Query:当我查询整个表的数据时,每次查询只得到 40K 条结果,一个月的数据平均该查询应该有 1.21 亿条记录。
Athena Cap是否查询结果数据?这是服务限制吗(文档并未表明情况如此)。
因此,一次获取 1000 个结果显然无法扩展。值得庆幸的是,有一个简单的解决方法。 (或者也许这就是一直以来应该做的事情。)
当您运行 Athena 查询时,您应该获得一个 QueryExecutionId。该 ID 对应于您在 S3 中找到的输出文件。
这是我写的一个片段:
s3 = boto3.resource("s3")
athena = boto3.client("athena")
response: Dict = athena.start_query_execution(QueryString=query, WorkGroup="<your_work_group>")
execution_id: str = response["QueryExecutionId"]
print(execution_id)
# Wait until the query is finished
while True:
try:
athena.get_query_results(QueryExecutionId=execution_id)
break
except botocore.exceptions.ClientError as e:
time.sleep(5)
local_filename: str = "temp/athena_query_result_temp.csv"
s3.Bucket("athena-query-output").download_file(execution_id + ".csv", local_filename)
return pd.read_csv(local_filename)
确保对应的WorkGroup设置了“查询结果位置”,例如: “s3://athena-query-output/”
另请参阅此帖子,其中包含类似的答案:How to Create Dataframe from AWS Athena using Boto3 get_query_results method
好像有1000个的限制。 您应该使用
NextToken
迭代结果。
引用GetQueryResults文档
MaxResults 在此返回的最大结果数(行) 请求。
类型:整数
有效范围:最小值0。最大值1000。
必填:否
另一种选择是分页和计数方法: 不知道是否有更好的方法来做到这一点,例如从表中选择 count(*) ...
这里是可供使用的完整示例代码。使用 python boto3 athena api 我使用
paginator
并将结果转换为字典列表,并随结果一起返回计数。
以下有2种方法 第一个将分页 第二个会将分页结果转换为字典列表并计算计数。
注意:在这种情况下不需要转换为
dict
列表。如果您不希望这样..在代码中您可以修改为只有 count
def get_athena_results_paginator(params, athena_client):
"""
:param params:
:param athena_client:
:return:
"""
query_id = athena_client.start_query_execution(
QueryString=params['query'],
QueryExecutionContext={
'Database': params['database']
}
# ,
# ResultConfiguration={
# 'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
# }
, WorkGroup=params['workgroup']
)['QueryExecutionId']
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(params.get('query')))
time.sleep(10)
results_paginator = athena_client.get_paginator('get_query_results')
results_iter = results_paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'PageSize': 1000
}
)
count, results = result_to_list_of_dict(results_iter)
return results, count
def result_to_list_of_dict(results_iter):
"""
:param results_iter:
:return:
"""
results = []
column_names = None
count = 0
for results_page in results_iter:
print(len(list(results_iter)))
for row in results_page['ResultSet']['Rows']:
count = count + 1
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return count, results
import time
import json
import boto3
import pandas as pd
from datetime import datetime, timedelta, date
res = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': ' ' # Replace with your actual database
},
ResultConfiguration={
'OutputLocation': ' ', # Replace with your actual S3 output location aws-athena-query-results-129137241730-eu-west-1
}
)
while True:
# Get the query execution status
execution = client.get_query_execution(QueryExecutionId=res['QueryExecutionId'])
status = execution['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
# If the query execution succeeded, get the query results
response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'])
# Fetch all the rows
while 'NextToken' in response:
next_response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], NextToken=response['NextToken'])
response['ResultSet']['Rows'].extend(next_response['ResultSet']['Rows'])
if 'NextToken' in next_response:
response['NextToken'] = next_response.get('NextToken')
else:
break
# Process the results in JSON FORMAT --> WILL have to parse to pd dataframe
resultados = response
elif status == 'FAILED' or status == 'CANCELLED':
# If the query execution failed or was cancelled, raise an exception
raise Exception('Query execution failed or was cancelled')
else:
# If the query execution is still running, wait for a while before checking the status again
time.sleep(5)