使用 Cloud Formation 模板创建 AWS Athena View

问题描述 投票:0回答:5

是否可以通过cloudformation模板创建Athena视图。我可以使用 Athena Dashboard 创建视图,但我想使用 CF 模板以编程方式执行此操作。在 AWS 文档中找不到任何详细信息,因此不确定是否支持。

谢谢。

amazon-web-services amazon-athena
5个回答
3
投票

可以使用 CloudFormation 创建视图,只是非常非常复杂。 Athena 视图存储在 Glue 数据目录中,就像数据库和表一样。事实上,Athena 视图是 Glue Data Catalog 中的表,只是内容略有不同。 有关如何以编程方式创建视图的完整说明,请参阅此答案,您将了解其复杂性:

以编程方式创建 AWS Athena 视图

– 可以将其映射到 CloudFormation,但我不建议这样做。 如果您想使用 CloudFormation 创建数据库和表,资源为

AWS::Glue::Database

AWS::Glue::Table


2
投票
内部

的数据不太适用,这些数据通常单独保存在其他基础设施中。 对于 Amazon Athena,AWS CloudFormation 仅支持:

数据目录
  • 命名查询
  • 工作组
  • 最接近您的要求的是
命名查询

,(我认为)它可以存储可以创建视图的查询(例如CREATE VIEW...)。

请参阅:

AWS::Athena::NamedQuery - AWS CloudFormation

更新:

@Theo 指出 AWS CloudFormation 还具有 AWS Glue 功能,其中包括:

AWS::胶水::桌子
  • 这显然可以用来创建视图。请参阅下面的评论。


2
投票
Athena

模板创建

CloudFormation
视图的最佳方法是使用自定义资源和 Lambda。我们必须提供视图创建和删除的方法。例如,使用
crhelper
库 Lambda 可以定义:
from __future__ import print_function
from crhelper import CfnResource
import logging
import os
import boto3

logger = logging.getLogger(__name__)
helper = CfnResource(json_logging=False, log_level='DEBUG', boto_level='CRITICAL', sleep_on_delete=120)

try:
    client = boto3.client('athena')
    ATHENA_WORKGROUP = os.environ['athena_workgroup']
    DATABASE = os.environ['database']
    QUERY_CREATE = os.environ['query_create']
    QUERY_DROP = os.environ['query_drop']
except Exception as e:
    helper.init_failure(e)

@helper.create
@helper.update
def create(event, context):
    logger.info("View creation started")

    try:
        executionResponse = client.start_query_execution(
            QueryString=QUERY_CREATE,
            QueryExecutionContext={'Database': DATABASE},
            WorkGroup='AudienceAthenaWorkgroup'
        )
        logger.info(executionResponse)

        response = client.get_query_execution(QueryExecutionId=executionResponse['QueryExecutionId'])
        logger.info(response)

        if response['QueryExecution']['Status']['State'] == 'FAILED':
            logger.error("Query failed")
            raise ValueError("Query failed")

        helper.Data['success'] = True
        helper.Data['id'] = executionResponse['QueryExecutionId']
        helper.Data['message'] = 'query is running'

    except Exception as e:
        print(f"An exception occurred: {e}")

    if not helper.Data.get("success"):
        raise ValueError("Creating custom resource failed.")

    return


@helper.delete
def delete(event, context):
    logger.info("View deletion started")

    try:
        executionResponse = client.start_query_execution(
            QueryString=QUERY_DROP,
            QueryExecutionContext={'Database': DATABASE},
            WorkGroup='AudienceAthenaWorkgroup'
        )
        logger.info(executionResponse)

    except Exception as e:
        print("An exception occurred")
        print(e)

@helper.poll_create
def poll_create(event, context):
    logger.info("Pol creation")

    response = client.get_query_execution(QueryExecutionId=event['CrHelperData']['id'])

    logger.info(f"Poll response: {response}")

    # There are 3 types of state of query
    # if state is failed - we stop and fail creation
    # if state is queued - we continue polling in 2 minutes
    # if state is succeeded - we stop and succeed creation
    if 'FAILED' == response['QueryExecution']['Status']['State']:
        logger.error("Query failed")
        raise ValueError("Query failed")

    if 'SUCCEEDED' == response['QueryExecution']['Status']['State']:
        logger.error("Query SUCCEEDED")
        return True

    if 'QUEUED' == response['QueryExecution']['Status']['State']:
        logger.error("Query QUEUED")
        return False

    # Return a resource id or True to indicate that creation is complete. if True is returned an id
    # will be generated
    # Return false to indicate that creation is not complete and we need to poll again
    return False

def handler(event, context):
    helper(event, context)

 Athena

视图创建/更新/删除的查询作为环境参数传递给 Lambda。 在

CloudFormation
模板中,我们必须定义调用上述
Python
代码并创建/更新/删除
Athena
视图的 Lambda。例如
  AthenaCommonViewLambda:
    Type: 'AWS::Lambda::Function'
    DependsOn: [CreateAthenaViewLayer, CreateAthenaViewLambdaRole]
    Properties:
      Environment:
        Variables:
          athena_workgroup: !Ref AudienceAthenaWorkgroup
          database:
            Ref: DatabaseName
          query_create: !Sub >-
            CREATE OR REPLACE VIEW ${TableName}_view AS
            SELECT field1, field2, ...
            FROM ${DatabaseName}.${TableName}
          query_drop: !Sub DROP VIEW IF EXISTS ${TableName}_common_view
      Code:
        S3Bucket: !Ref SourceS3Bucket
        S3Key: createview.zip
      FunctionName: !Sub '${AWS::StackName}_create_common_view'
      Handler: createview.handler
      MemorySize: 128
      Role: !GetAtt CreateAthenaViewLambdaRole.Arn
      Runtime: python3.8
      Timeout: 60
      Layers:
        - !Ref CreateAthenaViewLayer

  AthenaCommonView:
    Type: 'Custom::AthenaCommonView'
    Properties:
      ServiceToken: !GetAtt AthenaCommonViewLambda.Arn



0
投票

SomeView: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref Database TableInput: Name: v_some_table TableType: VIRTUAL_VIEW Parameters: {"presto_view": "true", "comment": "Presto View"} StorageDescriptor: SerdeInfo: SerializationLibrary: org.openx.data.jsonserde.JsonSerDe Columns: - Name: column1 Type: bigint - Name: colmb2 Type: int ViewExpandedText: '/* Presto View */' ViewOriginalText: !Join - '' - - '/* Presto View: ' - Fn::Base64: !Sub | { "originalSql": "SELECT\n \"column1\"\n, \"column2\"\n\nFROM\n some_table\n", "catalog": "catalog", "schema": "${Database}", "columns": [ { "name": "column1", "type": "bigint" }, { "name": "column2", "type": "integer" }, ] } - ' */'



0
投票

AthenaQueryCW: Type: AWS::Athena::NamedQuery Properties: Database: !Ref DatabaseName Description: Provides a chargeback view of joined data from cloudwath and transigateway Name: transit_gateway_chargeback_cw QueryString: !Sub CREATE OR REPLACE VIEW "transit_gateway_chargeback_to_customer_final_results" AS WITH summary AS ( SELECT * , (sum(bytesin[1]) + sum(bytesout[1]) OVER (PARTITION BY "bill_payer_account_id")) as total_data_transfer , (sum(bytesin[1]) + sum(bytesout[1])) as usage_account_dx_total FROM ${DatabaseName}.transit_gateway_data a LEFT JOIN ${DatabaseName}.transit_gateway_chargeback b ON a.customeraccount = b.line_item_usage_account_id AND a.year = b.year AND a.month = b.month GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30 ORDER BY bytesIn DESC ) SELECT * , (usage_account_dx_total / total_data_transfer) as chargeback_percentage , ("total_Networking_cost" * (usage_account_dx_total / total_data_transfer)) as chargeback_cost_final FROM summary

© www.soinside.com 2019 - 2024. All rights reserved.