从 Notebook 实例查询 Athena 中的表/数据库

问题描述 投票:0回答:3

我为不同的团队开发了不同的 Athena 工作组,以便我可以将他们的查询和查询结果分开。用户希望从其笔记本实例 (JupyterLab) 查询可用的表。我很难找到成功满足从用户特定工作组查询表的要求的代码。我只找到将从主工作组查询表的代码。

我当前使用的代码添加在下面。

from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
region_name='<YOUR REGION, for example, us-west-2>')


df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
df

此代码不起作用,因为用户只能从其特定工作组执行查询,因此运行此代码时会出现错误。它也不涵盖在用户特定工作组中分离用户查询的要求。

关于如何添加更改代码以便我可以从笔记本实例在特定工作组中运行查询有什么建议吗?

amazon-web-services jupyter-notebook amazon-athena amazon-sagemaker pyathena
3个回答
3
投票

pyathena
的文档并不是非常广泛,但是在查看源代码之后我们可以看到
connect
只是创建了
Connection
类的实例。

def connect(*args, **kwargs):
    from pyathena.connection import Connection
    return Connection(*args, **kwargs)

现在,在

GitHub
上查看 Connection.__init__ 的签名后,我们可以看到参数
work_group=None
,其名称与来自
official
AWS Python API start_query_execution
boto3
参数之一的名称相同。以下是他们的文档对此的说明:

WorkGroup(字符串)--正在其中启动查询的工作组的名称。

在完成

Connection
中的用法和导入之后,我们最终得到 BaseCursor 类,该类在底层调用
start_query_execution
,同时解压由
BaseCursor._build_start_query_execution_request
方法组装的参数的字典。这正是我们可以看到向 AWS Athena 提交查询的熟悉语法,特别是以下部分:

if self._work_group or work_group:
    request.update({
        'WorkGroup': work_group if work_group else self._work_group
    })

所以这应该对你的情况有帮助:

import pandas as pd
from pyathena import connect


conn = connect(
    s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
    region_name='<YOUR REGION, for example, us-west-2>',
    work_group='<USER SPECIFIC WORKGROUP>'
)

df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)

0
投票

我实现了这个,它对我有用。

!pip install pyathena

参考链接

from pyathena import connect
from pyathena.pandas.util import as_pandas
import boto3


query = """
    Select * from "s3-prod-db"."CustomerTransaction" ct where  date(partitiondate) >= date('2022-09-30') limit 10
"""
query

cursor = connect(s3_staging_dir='s3://s3-temp-prod2/',
                 region_name=boto3.session.Session().region_name, work_group='data-scientist').cursor()
df = cursor.execute(query)
print(cursor.state)
print(cursor.state_change_reason)
print(cursor.completion_date_time)
print(cursor.submission_date_time)
print(cursor.data_scanned_in_bytes)
print(cursor.output_location)
df = as_pandas(cursor)
print(df)

如果我们不传递

work_group
参数将使用“primary”作为默认的work_group。

如果我们传递

s3_staging_dir='s3://s3-temp-prod2/'
不存在的 s3 存储桶,它将创建这个存储桶。

但是,如果您运行脚本的用户角色不必创建存储桶权限,则会抛出异常。


0
投票

@Ilya Kisil 的方法对我有用,但如上所述我收到了一个 userWarning。我的解决方案如下:

from pyathena import connect
import pandas as pd

# Specify your Athena query
query = "SELECT * FROM db-name.your_table_name limit 10;"

# Create a connection to Athena
conn = connect(s3_staging_dir='s3://your-bucket-name/query-results/',
               region_name='your-region-name', 
               work_group='your-workgroup')

# Create a cursor
cursor = conn.cursor()

# Execute the query
cursor.execute(query)

# Get column names from cursor description
column_names = [desc[0] for desc in cursor.description]

# Fetch the results as a DataFrame
results = cursor.fetchall()
df = pd.DataFrame(results, columns=column_names)

# Display the results with column headings
print(df)
© www.soinside.com 2019 - 2024. All rights reserved.