如何使用 PySpark 让 AWS Glue ETL 作业返回包含所有结果的单个文件?

问题描述 投票:0回答:2

我创建了一个非常基本的 AWS Glue ETL 作业,用于从数据目录中选择一些字段,该数据目录是通过我指向 RDS 数据库的爬网程序构建的。返回数据集后,我将结果导出为 CSV 格式。然而,这是有效的;输出生成大约 20 个独特的文件。数据集现在只有两行,因此只有两个文件包含数据,其余文件仅显示列标题,没有第二行。我的要求是拥有一个包含从数据集中选择的所有数据的 CSV 文件。我尝试过重新分区和合并功能均未成功。我能够生成单个文件,但我的数据丢失了。我是 AWS Glue 的新手,一直无法弄清楚这一点,因此我们将不胜感激任何建议。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node PostgreSQL
PostgreSQL_node1644981751584 = glueContext.create_dynamic_frame.from_catalog(
    database="newApp",
    table_name="database_schema_staging_hdr",
    transformation_ctx="PostgreSQL_node1644981751584",
)

# Script generated for node SQL
SqlQuery0 = """
select * from myDataSource

"""
SQL_node1644981807578 = sparkSqlQuery(
    glueContext,
    query=SqlQuery0,
    mapping={"myDataSource": PostgreSQL_node1644981751584},
    transformation_ctx="SQL_node1644981807578",
)

# Script generated for node Amazon S3
AmazonS3_node1644981816657 = glueContext.write_dynamic_frame.from_options(
    frame=SQL_node1644981807578,
    connection_type="s3",
    format="csv",
    connection_options={"path": "s3://awsglueetloutput/", "partitionKeys": []},
    transformation_ctx="AmazonS3_node1644981816657",
)


job.commit()
pyspark aws-glue aws-glue-data-catalog
2个回答
1
投票

您必须重新分区 DynamicFrame 才能实现这一点。

最后有 1 个文件的示例:

SQL_node1644981807578 = SQL_node1644981807578.repartition(1)


0
投票

我有同样的问题,但我不想失去可视化 ETL 工具(如果你编辑脚本,你不会后悔并返回可视化编辑器)。

您需要在 SQL 查询和 S3 输出之间添加两个节点:

SQL 查询 > 自定义转换 > 从集合中选择 > S3 输出。

自定义变换

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    new_dfc = dfc.select(list(dfc.keys())[0]).repartition(1)
    return(DynamicFrameCollection({"CustomTransform0": new_dfc}, glueContext))

它会使用

DynamicFrameCollection
使用 SQL 生成
repartition(1)

选择收藏表

此节点是必需的,因为 S3 需要

DynamicFrame
而不是
DynamicFrameCollection
(在自定义变换中生成)。

通过此工作流程,您将在 S3 中生成一个 单个文件

© www.soinside.com 2019 - 2024. All rights reserved.