我创建了一个非常基本的 AWS Glue ETL 作业,用于从数据目录中选择一些字段,该数据目录是通过我指向 RDS 数据库的爬网程序构建的。返回数据集后,我将结果导出为 CSV 格式。然而,这是有效的;输出生成大约 20 个独特的文件。数据集现在只有两行,因此只有两个文件包含数据,其余文件仅显示列标题,没有第二行。我的要求是拥有一个包含从数据集中选择的所有数据的 CSV 文件。我尝试过重新分区和合并功能均未成功。我能够生成单个文件,但我的数据丢失了。我是 AWS Glue 的新手,一直无法弄清楚这一点,因此我们将不胜感激任何建议。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node PostgreSQL
PostgreSQL_node1644981751584 = glueContext.create_dynamic_frame.from_catalog(
database="newApp",
table_name="database_schema_staging_hdr",
transformation_ctx="PostgreSQL_node1644981751584",
)
# Script generated for node SQL
SqlQuery0 = """
select * from myDataSource
"""
SQL_node1644981807578 = sparkSqlQuery(
glueContext,
query=SqlQuery0,
mapping={"myDataSource": PostgreSQL_node1644981751584},
transformation_ctx="SQL_node1644981807578",
)
# Script generated for node Amazon S3
AmazonS3_node1644981816657 = glueContext.write_dynamic_frame.from_options(
frame=SQL_node1644981807578,
connection_type="s3",
format="csv",
connection_options={"path": "s3://awsglueetloutput/", "partitionKeys": []},
transformation_ctx="AmazonS3_node1644981816657",
)
job.commit()
您必须重新分区 DynamicFrame 才能实现这一点。
最后有 1 个文件的示例:
SQL_node1644981807578 = SQL_node1644981807578.repartition(1)
我有同样的问题,但我不想失去可视化 ETL 工具(如果你编辑脚本,你不会后悔并返回可视化编辑器)。
您需要在 SQL 查询和 S3 输出之间添加两个节点:
SQL 查询 > 自定义转换 > 从集合中选择 > S3 输出。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
new_dfc = dfc.select(list(dfc.keys())[0]).repartition(1)
return(DynamicFrameCollection({"CustomTransform0": new_dfc}, glueContext))
它会使用
DynamicFrameCollection
使用 SQL 生成 repartition(1)
。
此节点是必需的,因为 S3 需要
DynamicFrame
而不是 DynamicFrameCollection
(在自定义变换中生成)。
通过此工作流程,您将在 S3 中生成一个 单个文件。