如何使用glue将存储在s3中的json文件转换为csv?

问题描述 投票:0回答:2

我在 s3 中存储了一些 json 文件,我需要在它们所在的文件夹中将它们转换为 csv 格式。

目前我正在使用glue 将它们映射到athena,但是,正如我所说,现在我需要将它们映射到csv。

是否可以使用 Glue JOB 来做到这一点?

我试图了解胶水作业是否可以爬入我的 s3 文件夹目录,将其找到的所有 json 文件转换为 csv(作为新文件)。

如果不可能,是否有任何 AWS 服务可以帮助我做到这一点?

编辑1:

这是我正在尝试运行的当前代码

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://agco-sa-dfs-dv/dealer-data"]}, format = "json")
outputGDF = glueContext.write_dynamic_frame.from_options(frame = inputGDF, connection_type = "s3", connection_options = {"path": "s3://agco-sa-dfs-dv/dealer-data"}, format = "csv")

作业运行没有错误,但 s3 文件夹上似乎没有任何反应。 我假设代码将从 /dealer-data 获取 json 文件并将其转换为同一文件夹(如 csv)。我可能错了。

编辑2:

好吧,我几乎让它按照我需要的方式工作了。

问题是,创建动态框架仅适用于包含文件的文件夹,不适用于包含文件子文件夹的文件夹。

import sys
import logging
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2"]}, format = "json")

outputGDF = glueContext.write_dynamic_frame.from_options(frame = inputGDF, connection_type = "s3", connection_options = {"path": "s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2/bla.csv"}, format = "csv")

上面的方法有效,但仅适用于该目录(../2) 有没有办法读取给定文件夹和子文件夹的所有文件?

amazon-web-services amazon-s3 aws-glue
2个回答
1
投票

您应该将

recurse
选项设置为
True
以进行 S3 连接:

inputGDF = glueContext.create_dynamic_frame_from_options(
    connection_type = "s3", 
    connection_options = {
        "paths": ["s3://agco-sa-dfs-dv/dealer-data/installations/3555/2019/2"],
        "recurse" : True
    }, 
    format = "json
)

0
投票

这里是如何从 JSON 转换为 CSV 的工作示例。您可以通过可视化 ETL 来实现:

这是带有参数的生成代码:

# Script generated for node Amazon S3
AmazonS3_node1698837830668 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={"paths": ["s3://raw-data-rwt6y"]},
    transformation_ctx="AmazonS3_node1698837830668",
)

# Script generated for node Explode Array Or Map Into Rows
ExplodeArrayOrMapIntoRows_node1698843098805 = AmazonS3_node1698837830668.gs_explode(
    colName="data", newCol="data"
)

# Script generated for node Flatten
Flatten_node1698843497504 = ExplodeArrayOrMapIntoRows_node1698843098805.gs_flatten()

# Script generated for node Amazon S3
AmazonS3_node1698837833637 = glueContext.write_dynamic_frame.from_options(
    frame=Flatten_node1698843497504,
    connection_type="s3",
    format="csv",
    connection_options={"path": "s3://clean-zone-byef0c", "partitionKeys": []},
    transformation_ctx="AmazonS3_node1698837833637",
)

job.commit()

输入JSON:

{"data":[{"p":34373.33,"s":"USD","t":1698819194578,"v":0.0012},{"p":34373.33,"s":"USD","t":1698819194578,"v":0.01303}]}

输出 CSV:

data.p,data.s,data.t,data.v
34373.33,USD,1698819194578,0.0012
34373.33,USD,1698819194578,0.01303
© www.soinside.com 2019 - 2024. All rights reserved.