如何通过 pyspark 在 s3 存储桶上写入数据帧但不使用 Hadoop

问题描述 投票:0回答:1

我想通过 pyspark 直接在 s3 存储桶上写入数据帧,但不想使用 Hadoop。 python 或 pyspark 代码中不需要 Hadoop 的任何单词。

from pyspark.sql import SparkSession

aws_access_key_id = 'ABC'
aws_secret_access_key = 'XYZ'
region_name = 'ap-south-1'
bucket_name = 'integration'
folder_name = 'NGETL-POC'

# Initialize Spark session
spark = SparkSession.builder.appName('temp1') \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
    .config("spark.hadoop.fs.s3a.endpoint", f"s3-{region_name}.amazonaws.com") \
    .getOrCreate()

data1 = [(1, 'abc', 'A', 1), (2, 'pqr', 'B', 2), (3, 'efg', 'C', 4), (5, 'xyz', 'D', 6)]
fileHeadersColumns = ['student_id', 'st_name', 'st_class', 'st_roll_no']
df = spark.createDataFrame(data1, fileHeadersColumns)
df.show()

data2 = [(1, 'Maths', 50), (2, 'English', 60), (1, 'English', 70), (3, 'English', 80), (4, 'English', 40), (2, 'Maths', 60), (3, 'Maths', 70), (4, 'Maths', 80)]
redisColumns = ['student_id', 'subject', 'Marks']
df1 = spark.createDataFrame(data2, redisColumns)
df1.show()

joinedDf = df.join(df1, on="student_id", how="inner")
joinedDf.show()
print("file uploaded on S3")

# Write the DataFrame to S3 as a CSV
output_path = f"s3a://{bucket_name}/{folder_name}/data_1.csv"
joinedDf.write \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("quoteAll", "true") \
    .csv(output_path)
print("file uploaded on S3 post")

“”“这里我在配置部分使用hadoop{config(“spark.hadoop.fs.s3a.aws.credentials.provider},我只想在s3存储桶上写入这个Dataframe(joinedDf)而不使用hadoop.kindly ,尽快提供解决方案。"""

dataframe amazon-s3 pyspark apache-spark-sql spark-streaming
1个回答
0
投票

简单明了。

  1. 用您选择的语言编写 s3a 连接器的完整替代品。 1周,忽略测试。

  2. Spark 文件输出代码确实使用 hadoop 文件系统 API,因此您将需要在类路径上使用 hadoop-common,除非您也替换它。完整规范以及合规性测试均已在线。 2-3 周才能让测试发挥作用,除非您尝试更换 Spark writer,这将需要更长的时间。

  3. 您还需要代码在工作线程失败的情况下提交输出,因为您知道目录重命名是非原子的并且文件重命名很慢。 EMR 和 S3A 提交者都使用分段上传,其中工作人员写入最终目标,将上传信息传播到 Spark 驱动程序,然后 Spark 驱动程序在作业提交中完成上传。有关详细信息,请参阅“零重命名提交者”。 4 周以上。

让我们知道您的进展如何。

您的另一个选择是:写入共享文件系统,然后让 Spark 驱动程序使用 s3 命令行工具上传。再说一遍,你的作业。

© www.soinside.com 2019 - 2024. All rights reserved.