AWS Glue:如何在输出中添加包含源文件名的列?

问题描述 投票:0回答:5

有谁知道如何将源文件名添加为胶水作业中的列?

我们创建了一个流程,在其中抓取 S3 中的一些文件以创建架构。然后,我们编写了一个作业,将文件转换为新格式,并将这些文件以 CSV 形式写回另一个 S3 存储桶,以供管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向包含原始文件名的输出文件添加一个新列。

我查看了 AWS 文档和 aws-glue-libs 源代码,但没有看到任何内容。理想情况下,会有某种方法从

awsglue.job
包获取元数据(我们使用的是 python 风格)。

我仍在学习胶水,所以如果我使用了错误的术语,请道歉。我也用 Spark 标签标记了它,因为我相信这就是 Glue 在幕后使用的。

amazon-web-services apache-spark pyspark aws-glue
5个回答
4
投票

您可以在 etl 工作中使用 Spark 来做到这一点:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

记住它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()


4
投票

使用 AWS Glue Python 自动生成的脚本,我添加了以下几行:

from pyspark.sql.functions import input_file_name

## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())

## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")

然后,在代码的

ApplyMapping
datasink
部分中,引用
datasource2


1
投票

我正在使用 AWS Glue Python 自动生成的脚本。我尝试使用 JcMaco 的解决方案,因为这正是我所需要的,并且这是一个非常简单的解决方案,使用

input_file_name()

但是,我无法让它工作,除了列标题之外,我的列总是空着,但我能够获取胶水作业的名称并将其用作新列中的常量,并且在我的这个特定用例中,它与

input_file_name()
具有相同的用途。

如果您查看脚本的左上角,您将看到

args
变量的创建位置。使用
args
访问 JOB_NAME,如下所示。

我是如何做到的:

from pyspark.sql.functions import *

job_name = args['JOB_NAME'] # define new variable

(JOB_NAME 作为命令行参数传入。)

然后,在脚本中的

datasource0
定义之后,将
job_name
lit
函数一起使用:

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1") 
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")

在上面的示例中,您可以将

frame
定义中的
datasink
参数的分配更改为
applymapping3


0
投票

您还可以使用 Map 转换将函数应用于 DynamicFrame 的所有记录,在函数内部您可以添加字段,一个字段可以是作业的名称,该值可以 作为参数发送给AWS Glue 作业:

# add argument to the job
client.start_job_run(
               JobName = 'my_test_Job',
               Arguments = {
                 '--job_name':   'name', } )
# read the argument
args = getResolvedOptions(sys.argv,
                          ['job_name'])
arg_job_name = args["job_name"]

inputDyf = glueContext.create_dynamic_frame_from_options(
    ...
)


def mapping(record: Dict[str, Any]):
    record["Job"] = arg_job_name
    return record 

mapped_dyF = Map.apply(frame=inputDyf, f=mapping)

0
投票

如果您使用

GlueContext.write_dynamic_frame.from_options
,则可以使用
attachFilename
中的
format_options
键添加具有源文件名 aws-glue-programming-etl-format

的列
dynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://s3path"]},
    format="json",
    format_options={
        "multiline": True,
        "attachFilename": "your_filename_column_name"
    }
)
© www.soinside.com 2019 - 2024. All rights reserved.