我在 s3 中存储了一些 json 文件,我需要在它们所在的文件夹中将它们转换为 csv 格式。
目前我正在使用glue 将它们映射到athena,但是,正如我所说,现在我需要将它们映射到csv。
是否可以使用 Glue JOB 来做到这一点?
我试图了解胶水作业是否可以爬入我的 s3 文件夹目录,将其找到的所有 json 文件转换为 csv(作为新文件)。
如果不可能,是否有任何 AWS 服务可以帮助我做到这一点?
编辑1:
这是我正在尝试运行的当前代码
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://agco-sa-dfs-dv/dealer-data"]}, format = "json")
outputGDF = glueContext.write_dynamic_frame.from_options(frame = inputGDF, connection_type = "s3", connection_options = {"path": "s3://agco-sa-dfs-dv/dealer-data"}, format = "csv")
作业运行没有错误,但 s3 文件夹上似乎没有任何反应。 我假设代码将从 /dealer-data 获取 json 文件并将其转换为同一文件夹(如 csv)。我可能错了。
编辑2:
好吧,我几乎让它按照我需要的方式工作了。
问题是,创建动态框架仅适用于包含文件的文件夹,不适用于包含文件子文件夹的文件夹。
import sys
import logging
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext()
glueContext = GlueContext(sc)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2"]}, format = "json")
outputGDF = glueContext.write_dynamic_frame.from_options(frame = inputGDF, connection_type = "s3", connection_options = {"path": "s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2/bla.csv"}, format = "csv")
上面的方法有效,但仅适用于该目录(../2) 有没有办法读取给定文件夹和子文件夹的所有文件?
您应该将
recurse
选项设置为 True
以进行 S3 连接:
inputGDF = glueContext.create_dynamic_frame_from_options(
connection_type = "s3",
connection_options = {
"paths": ["s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2"],
"recurse" : True
},
format = "json
)
这里是如何从 JSON 转换为 CSV 的工作示例。您可以通过可视化 ETL 来实现:
这是带有参数的生成代码:
# Script generated for node Amazon S3
AmazonS3_node1698837830668 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={"paths": ["s3://raw-data-rwt6y"]},
transformation_ctx="AmazonS3_node1698837830668",
)
# Script generated for node Explode Array Or Map Into Rows
ExplodeArrayOrMapIntoRows_node1698843098805 = AmazonS3_node1698837830668.gs_explode(
colName="data", newCol="data"
)
# Script generated for node Flatten
Flatten_node1698843497504 = ExplodeArrayOrMapIntoRows_node1698843098805.gs_flatten()
# Script generated for node Amazon S3
AmazonS3_node1698837833637 = glueContext.write_dynamic_frame.from_options(
frame=Flatten_node1698843497504,
connection_type="s3",
format="csv",
connection_options={"path": "s3://clean-zone-byef0c", "partitionKeys": []},
transformation_ctx="AmazonS3_node1698837833637",
)
job.commit()
输入JSON:
{"data":[{"p":34373.33,"s":"USD","t":1698819194578,"v":0.0012},{"p":34373.33,"s":"USD","t":1698819194578,"v":0.01303}]}
输出 CSV:
data.p,data.s,data.t,data.v
34373.33,USD,1698819194578,0.0012
34373.33,USD,1698819194578,0.01303