我为不同的团队开发了不同的 Athena 工作组,以便我可以将他们的查询和查询结果分开。用户希望从其笔记本实例 (JupyterLab) 查询可用的表。我很难找到成功满足从用户特定工作组查询表的要求的代码。我只找到将从主工作组查询表的代码。
我当前使用的代码添加在下面。
from pyathena import connect
import pandas as pd
conn = connect(s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
region_name='<YOUR REGION, for example, us-west-2>')
df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
df
此代码不起作用,因为用户只能从其特定工作组执行查询,因此运行此代码时会出现错误。它也不涵盖在用户特定工作组中分离用户查询的要求。
关于如何添加更改代码以便我可以从笔记本实例在特定工作组中运行查询有什么建议吗?
pyathena
的文档并不是非常广泛,但是在查看源代码之后我们可以看到connect
只是创建了Connection
类的实例。
def connect(*args, **kwargs):
from pyathena.connection import Connection
return Connection(*args, **kwargs)
现在,在
GitHub上查看
Connection.__init__
的签名后,我们可以看到参数 work_group=None
,其名称与来自 officialAWS Python API
start_query_execution
的 boto3
参数之一的名称相同。以下是他们的文档对此的说明:
WorkGroup(字符串)--正在其中启动查询的工作组的名称。
在完成
Connection
中的用法和导入之后,我们最终得到 BaseCursor 类,该类在底层调用 start_query_execution
,同时解压由 BaseCursor._build_start_query_execution_request
方法组装的参数的字典。这正是我们可以看到向 AWS Athena 提交查询的熟悉语法,特别是以下部分:
if self._work_group or work_group:
request.update({
'WorkGroup': work_group if work_group else self._work_group
})
所以这应该对你的情况有帮助:
import pandas as pd
from pyathena import connect
conn = connect(
s3_staging_dir='<ATHENA QUERY RESULTS LOCATION>',
region_name='<YOUR REGION, for example, us-west-2>',
work_group='<USER SPECIFIC WORKGROUP>'
)
df = pd.read_sql("SELECT * FROM <DATABASE-NAME>.<YOUR TABLE NAME> limit 8;", conn)
我实现了这个,它对我有用。
!pip install pyathena
参考链接
from pyathena import connect
from pyathena.pandas.util import as_pandas
import boto3
query = """
Select * from "s3-prod-db"."CustomerTransaction" ct where date(partitiondate) >= date('2022-09-30') limit 10
"""
query
cursor = connect(s3_staging_dir='s3://s3-temp-prod2/',
region_name=boto3.session.Session().region_name, work_group='data-scientist').cursor()
df = cursor.execute(query)
print(cursor.state)
print(cursor.state_change_reason)
print(cursor.completion_date_time)
print(cursor.submission_date_time)
print(cursor.data_scanned_in_bytes)
print(cursor.output_location)
df = as_pandas(cursor)
print(df)
如果我们不传递
work_group
参数将使用“primary”作为默认的work_group。
如果我们传递
s3_staging_dir='s3://s3-temp-prod2/'
不存在的 s3 存储桶,它将创建这个存储桶。
但是,如果您运行脚本的用户角色不必创建存储桶权限,则会抛出异常。
@Ilya Kisil 的方法对我有用,但如上所述我收到了一个 userWarning。我的解决方案如下:
from pyathena import connect
import pandas as pd
# Specify your Athena query
query = "SELECT * FROM db-name.your_table_name limit 10;"
# Create a connection to Athena
conn = connect(s3_staging_dir='s3://your-bucket-name/query-results/',
region_name='your-region-name',
work_group='your-workgroup')
# Create a cursor
cursor = conn.cursor()
# Execute the query
cursor.execute(query)
# Get column names from cursor description
column_names = [desc[0] for desc in cursor.description]
# Fetch the results as a DataFrame
results = cursor.fetchall()
df = pd.DataFrame(results, columns=column_names)
# Display the results with column headings
print(df)