如何在 AWS Glue ETL 作业中查询 Athena 中的视图

问题描述 投票:0回答:1

我在 AWS Athena 中有一些视图,可以从另一个 AWS 账户/团队在我们的目录中访问这些视图。 AWS 本身不支持通过 Glue ETL 作业访问视图,因为我在尝试这样做时收到“错误代码 10”。我可以在 Athena 中很好地访问数据。我该如何解决这个问题?

amazon-web-services aws-glue
1个回答
0
投票

您可以做的一种方法是使用 Boto3。

   import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3 #this is not generated and needs to be placed here
from awsglue.dynamicframe import DynamicFrame

print("Glue Job Started") #not needed but will help to make sure the job started when troubleshooting
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext() ##setting SparkContext
glueContext = GlueContext(sc)
spark = glueContext.spark_session

glueContext._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog

#below is for establishing boto3 client connectivity
athena_client = boto3.client('athena') #needed
print("Athena Client Created") #not needed but good for troubleshooting
s3 = boto3.client('s3') #needed

# Below block of code is to delete the current data in your s3 folder as boto3 will bring the data from the views here first and then run your query. If you want to overwrite the previous data, you will need this. Otherwise, it is optional.
s3resource = boto3.resource('s3')
bucket = s3resource.Bucket('[your bucket]) #replace your bucket with your s3 internal link
for obj in bucket.objects.filter(Prefix='ETL/shared/icoms_hp/'): # delete from new path
s3resource.Object(bucket.name,obj.key).delete()

#this is the start of your main query
query = """
UNLOAD (
#write SQL query here
)
TO '[internal S3 path]'. #this is where you want the final data to be populated in 
your s3 
path
 WITH (
format = 'PARQUET', #your format you want to output
compression = 'SNAPPY'
);
"""
 #below block will save the temp data during execution
 response = athena_client.start_query_execution(
 QueryString=query,
 ResultConfiguration={
 'OutputLocation': '[your temp s3 output folder]'}
 )

#below block is for troubleshooting and seeing the query in athena
queryid = response['QueryExecutionId']
print(queryid)
status = athena_client.get_query_execution(QueryExecutionId=queryid). 
['QueryExecution']['Status']['State']
while status.upper() in ['QUEUED', 'RUNNING']:
status = athena_client.get_query_execution(QueryExecutionId=queryid). 
['QueryExecution']['Status']['State']
print(f"status - {status}")
© www.soinside.com 2019 - 2024. All rights reserved.