(AWS) Athena:查询结果似乎太短

问题描述 投票:0回答:4

我的 Athena 查询的结果似乎太短。试图找出为什么?

设置:

胶水目录(118.6 Gig 大小)。 数据:以 CSV 和 JSON 格式存储在 S3 中。 Athena Query:当我查询整个表的数据时,每次查询只得到 40K 条结果,一个月的数据平均该查询应该有 1.21 亿条记录。

Athena Cap是否查询结果数据?这是服务限制吗(文档并未表明情况如此)。

amazon-web-services amazon-s3 amazon-athena aws-glue
4个回答
4
投票

因此,一次获取 1000 个结果显然无法扩展。值得庆幸的是,有一个简单的解决方法。 (或者也许这就是一直以来应该做的事情。)

当您运行 Athena 查询时,您应该获得一个 QueryExecutionId。该 ID 对应于您在 S3 中找到的输出文件。

这是我写的一个片段:

s3 = boto3.resource("s3")
athena = boto3.client("athena")
response: Dict = athena.start_query_execution(QueryString=query, WorkGroup="<your_work_group>")
execution_id: str = response["QueryExecutionId"]
print(execution_id)

# Wait until the query is finished
while True:
    try:
        athena.get_query_results(QueryExecutionId=execution_id)
        break
    except botocore.exceptions.ClientError as e:
        time.sleep(5)

local_filename: str = "temp/athena_query_result_temp.csv"
s3.Bucket("athena-query-output").download_file(execution_id + ".csv", local_filename)
return pd.read_csv(local_filename)

确保对应的WorkGroup设置了“查询结果位置”,例如: “s3://athena-query-output/”

另请参阅此帖子,其中包含类似的答案:How to Create Dataframe from AWS Athena using Boto3 get_query_results method


3
投票

好像有1000个的限制。 您应该使用

NextToken
迭代结果。

引用GetQueryResults文档

MaxResults 在此返回的最大结果数(行) 请求。

类型:整数

有效范围:最小值0。最大值1000。

必填:否


1
投票

另一种选择是分页和计数方法: 不知道是否有更好的方法来做到这一点,例如从表中选择 count(*) ...

这里是可供使用的完整示例代码。使用 python boto3 athena api 我使用

paginator
并将结果转换为字典列表,并随结果一起返回计数。

以下有2种方法 第一个将分页 第二个会将分页结果转换为字典列表并计算计数。

注意:在这种情况下不需要转换为

dict
列表。如果您不希望这样..在代码中您可以修改为只有 count

def get_athena_results_paginator(params, athena_client):
    """

    :param params:
    :param athena_client:
    :return:
    """
    query_id = athena_client.start_query_execution(
        QueryString=params['query'],
        QueryExecutionContext={
            'Database': params['database']
        }
        # ,
        # ResultConfiguration={
        #     'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        # }
        , WorkGroup=params['workgroup']

    )['QueryExecutionId']
    query_status = None
    while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
        query_status = athena_client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
        if query_status == 'FAILED' or query_status == 'CANCELLED':
            raise Exception('Athena query with the string "{}" failed or was cancelled'.format(params.get('query')))
        time.sleep(10)
    results_paginator = athena_client.get_paginator('get_query_results')
    results_iter = results_paginator.paginate(
        QueryExecutionId=query_id,
        PaginationConfig={
            'PageSize': 1000
        }
    )
    count, results = result_to_list_of_dict(results_iter)
    return results, count


def result_to_list_of_dict(results_iter):
    """

    :param results_iter:
    :return:
    """
    results = []
    column_names = None
    count = 0
    for results_page in results_iter:
        print(len(list(results_iter)))
        for row in results_page['ResultSet']['Rows']:
            count = count + 1
            column_values = [col.get('VarCharValue', None) for col in row['Data']]
            if not column_names:
                column_names = column_values
            else:
                results.append(dict(zip(column_names, column_values)))
    return count, results


0
投票
import time
import json
import boto3
import pandas as pd
from datetime import datetime, timedelta, date

res = client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': '         '  # Replace with your actual database
    },
    ResultConfiguration={
        'OutputLocation': '                            ',  # Replace with your actual S3 output location aws-athena-query-results-129137241730-eu-west-1
    }
)
while True:
    # Get the query execution status
    execution = client.get_query_execution(QueryExecutionId=res['QueryExecutionId'])
    status = execution['QueryExecution']['Status']['State']
    if status == 'SUCCEEDED':
        # If the query execution succeeded, get the query results
        response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'])
        # Fetch all the rows
        while 'NextToken' in response:
            next_response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], NextToken=response['NextToken'])
            response['ResultSet']['Rows'].extend(next_response['ResultSet']['Rows'])
            if 'NextToken' in next_response:
                response['NextToken'] = next_response.get('NextToken')
            else:
                break
                
        # Process the results in JSON FORMAT --> WILL  have to parse to pd dataframe
        resultados = response
     elif status == 'FAILED' or status == 'CANCELLED':
            # If the query execution failed or was cancelled, raise an exception
            raise Exception('Query execution failed or was cancelled')
else:
    # If the query execution is still running, wait for a while before checking the status again
    time.sleep(5)
© www.soinside.com 2019 - 2024. All rights reserved.