是否可以通过cloudformation模板创建Athena视图。我可以使用 Athena Dashboard 创建视图,但我想使用 CF 模板以编程方式执行此操作。在 AWS 文档中找不到任何详细信息,因此不确定是否支持。
谢谢。
Athena
模板创建
CloudFormation
视图的最佳方法是使用自定义资源和 Lambda。我们必须提供视图创建和删除的方法。例如,使用 crhelper
库 Lambda 可以定义:from __future__ import print_function
from crhelper import CfnResource
import logging
import os
import boto3
logger = logging.getLogger(__name__)
helper = CfnResource(json_logging=False, log_level='DEBUG', boto_level='CRITICAL', sleep_on_delete=120)
try:
client = boto3.client('athena')
ATHENA_WORKGROUP = os.environ['athena_workgroup']
DATABASE = os.environ['database']
QUERY_CREATE = os.environ['query_create']
QUERY_DROP = os.environ['query_drop']
except Exception as e:
helper.init_failure(e)
@helper.create
@helper.update
def create(event, context):
logger.info("View creation started")
try:
executionResponse = client.start_query_execution(
QueryString=QUERY_CREATE,
QueryExecutionContext={'Database': DATABASE},
WorkGroup='AudienceAthenaWorkgroup'
)
logger.info(executionResponse)
response = client.get_query_execution(QueryExecutionId=executionResponse['QueryExecutionId'])
logger.info(response)
if response['QueryExecution']['Status']['State'] == 'FAILED':
logger.error("Query failed")
raise ValueError("Query failed")
helper.Data['success'] = True
helper.Data['id'] = executionResponse['QueryExecutionId']
helper.Data['message'] = 'query is running'
except Exception as e:
print(f"An exception occurred: {e}")
if not helper.Data.get("success"):
raise ValueError("Creating custom resource failed.")
return
@helper.delete
def delete(event, context):
logger.info("View deletion started")
try:
executionResponse = client.start_query_execution(
QueryString=QUERY_DROP,
QueryExecutionContext={'Database': DATABASE},
WorkGroup='AudienceAthenaWorkgroup'
)
logger.info(executionResponse)
except Exception as e:
print("An exception occurred")
print(e)
@helper.poll_create
def poll_create(event, context):
logger.info("Pol creation")
response = client.get_query_execution(QueryExecutionId=event['CrHelperData']['id'])
logger.info(f"Poll response: {response}")
# There are 3 types of state of query
# if state is failed - we stop and fail creation
# if state is queued - we continue polling in 2 minutes
# if state is succeeded - we stop and succeed creation
if 'FAILED' == response['QueryExecution']['Status']['State']:
logger.error("Query failed")
raise ValueError("Query failed")
if 'SUCCEEDED' == response['QueryExecution']['Status']['State']:
logger.error("Query SUCCEEDED")
return True
if 'QUEUED' == response['QueryExecution']['Status']['State']:
logger.error("Query QUEUED")
return False
# Return a resource id or True to indicate that creation is complete. if True is returned an id
# will be generated
# Return false to indicate that creation is not complete and we need to poll again
return False
def handler(event, context):
helper(event, context)
Athena
视图创建/更新/删除的查询作为环境参数传递给 Lambda。 在
CloudFormation
模板中,我们必须定义调用上述 Python
代码并创建/更新/删除 Athena
视图的 Lambda。例如 AthenaCommonViewLambda:
Type: 'AWS::Lambda::Function'
DependsOn: [CreateAthenaViewLayer, CreateAthenaViewLambdaRole]
Properties:
Environment:
Variables:
athena_workgroup: !Ref AudienceAthenaWorkgroup
database:
Ref: DatabaseName
query_create: !Sub >-
CREATE OR REPLACE VIEW ${TableName}_view AS
SELECT field1, field2, ...
FROM ${DatabaseName}.${TableName}
query_drop: !Sub DROP VIEW IF EXISTS ${TableName}_common_view
Code:
S3Bucket: !Ref SourceS3Bucket
S3Key: createview.zip
FunctionName: !Sub '${AWS::StackName}_create_common_view'
Handler: createview.handler
MemorySize: 128
Role: !GetAtt CreateAthenaViewLambdaRole.Arn
Runtime: python3.8
Timeout: 60
Layers:
- !Ref CreateAthenaViewLayer
AthenaCommonView:
Type: 'Custom::AthenaCommonView'
Properties:
ServiceToken: !GetAtt AthenaCommonViewLambda.Arn
SomeView:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref Database
TableInput:
Name: v_some_table
TableType: VIRTUAL_VIEW
Parameters: {"presto_view": "true", "comment": "Presto View"}
StorageDescriptor:
SerdeInfo:
SerializationLibrary: org.openx.data.jsonserde.JsonSerDe
Columns:
- Name: column1
Type: bigint
- Name: colmb2
Type: int
ViewExpandedText: '/* Presto View */'
ViewOriginalText: !Join
- ''
- - '/* Presto View: '
- Fn::Base64: !Sub |
{
"originalSql": "SELECT\n \"column1\"\n, \"column2\"\n\nFROM\n some_table\n",
"catalog": "catalog",
"schema": "${Database}",
"columns": [
{
"name": "column1",
"type": "bigint"
},
{
"name": "column2",
"type": "integer"
},
]
}
- ' */'
AthenaQueryCW:
Type: AWS::Athena::NamedQuery
Properties:
Database: !Ref DatabaseName
Description: Provides a chargeback view of joined data from cloudwath and transigateway
Name: transit_gateway_chargeback_cw
QueryString: !Sub
CREATE OR REPLACE VIEW "transit_gateway_chargeback_to_customer_final_results" AS
WITH
summary AS (
SELECT
*
, (sum(bytesin[1]) + sum(bytesout[1]) OVER (PARTITION BY "bill_payer_account_id")) as total_data_transfer
, (sum(bytesin[1]) + sum(bytesout[1])) as usage_account_dx_total
FROM
${DatabaseName}.transit_gateway_data a
LEFT JOIN ${DatabaseName}.transit_gateway_chargeback b ON a.customeraccount = b.line_item_usage_account_id AND a.year = b.year AND a.month = b.month
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30
ORDER BY bytesIn DESC
)
SELECT
*
, (usage_account_dx_total / total_data_transfer) as chargeback_percentage
, ("total_Networking_cost" * (usage_account_dx_total / total_data_transfer)) as chargeback_cost_final
FROM
summary