有谁知道如何将源文件名添加为胶水作业中的列?
我们创建了一个流程,在其中抓取 S3 中的一些文件以创建架构。然后,我们编写了一个作业,将文件转换为新格式,并将这些文件以 CSV 形式写回另一个 S3 存储桶,以供管道的其余部分使用。我们想要做的是访问某种作业元属性,以便我们可以向包含原始文件名的输出文件添加一个新列。
我查看了 AWS 文档和 aws-glue-libs 源代码,但没有看到任何内容。理想情况下,会有某种方法从
awsglue.job
包获取元数据(我们使用的是 python 风格)。
我仍在学习胶水,所以如果我使用了错误的术语,请道歉。我也用 Spark 标签标记了它,因为我相信这就是 Glue 在幕后使用的。
您可以在 etl 工作中使用 Spark 来做到这一点:
var df = glueContext.getCatalogSource(
database = database,
tableName = table,
transformationContext = s"source-$database.$table"
).getDynamicFrame()
.toDF()
.withColumn("input_file_name", input_file_name())
glueContext.getSinkWithFormat(
connectionType = "s3",
options = JsonOptions(Map(
"path" -> args("DST_S3_PATH")
)),
transformationContext = "",
format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))
记住它仅适用于 getCatalogSource() API,不适用于 create_dynamic_frame_from_options()
使用 AWS Glue Python 自动生成的脚本,我添加了以下几行:
from pyspark.sql.functions import input_file_name
## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())
## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")
然后,在代码的
ApplyMapping
或 datasink
部分中,引用 datasource2
。
我正在使用 AWS Glue Python 自动生成的脚本。我尝试使用 JcMaco 的解决方案,因为这正是我所需要的,并且这是一个非常简单的解决方案,使用
input_file_name()
。
但是,我无法让它工作,除了列标题之外,我的列总是空着,但我能够获取胶水作业的名称并将其用作新列中的常量,并且在我的这个特定用例中,它与
input_file_name()
具有相同的用途。
如果您查看脚本的左上角,您将看到
args
变量的创建位置。使用 args
访问 JOB_NAME,如下所示。
我是如何做到的:
from pyspark.sql.functions import *
job_name = args['JOB_NAME'] # define new variable
(JOB_NAME 作为命令行参数传入。)
然后,在脚本中的
datasource0
定义之后,将 job_name
与 lit
函数一起使用:
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1")
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")
在上面的示例中,您可以将
frame
定义中的 datasink
参数的分配更改为 applymapping3
。
您还可以使用 Map 转换将函数应用于 DynamicFrame 的所有记录,在函数内部您可以添加字段,一个字段可以是作业的名称,该值可以 作为参数发送给AWS Glue 作业:
# add argument to the job
client.start_job_run(
JobName = 'my_test_Job',
Arguments = {
'--job_name': 'name', } )
# read the argument
args = getResolvedOptions(sys.argv,
['job_name'])
arg_job_name = args["job_name"]
inputDyf = glueContext.create_dynamic_frame_from_options(
...
)
def mapping(record: Dict[str, Any]):
record["Job"] = arg_job_name
return record
mapped_dyF = Map.apply(frame=inputDyf, f=mapping)
如果您使用
GlueContext.write_dynamic_frame.from_options
,则可以使用 attachFilename
中的 format_options
键添加具有源文件名 aws-glue-programming-etl-format 的列
dynamicFrame = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": ["s3://s3path"]},
format="json",
format_options={
"multiline": True,
"attachFilename": "your_filename_column_name"
}
)