从PySpark批量加载到AWS RDS(postgres)

问题描述 投票:0回答:2

我正在寻找一个批量加载器,用于将胶水作业加载到RDS,使用PySpark脚本和DataFormatWriter。我有这个为RedShift工作如下:

df.write \
    .format("com.databricks.spark.redshift") \
    .option("url", jdbcconf.get("url") + '/' + DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
    .option("dbtable", TABLE_NAME) \
    .option("tempdir", args["TempDir"]) \
    .option("forward_spark_s3_credentials", "true") \
    .mode("overwrite") \
    .save()

上面定义了df以读取文件。在RDS中而不是在REDSHIFT中,我可以采取的最佳方法是什么?

amazon-web-services amazon-rds aws-glue
2个回答
0
投票

在RDS中,您只是APPEND / OVERWRITE,在这种情况下,您可以创建RDS JDBC连接,并使用如下所示的内容:

postgres_url="jdbc:postgresql://localhost:portnum/sakila?user=<user>&password=<pwd>"
df.write.jdbc(postgres_url,table="actor1",mode="append") #for append
df.write.jdbc(postgres_url,table="actor1",mode="overwrite") #for overwrite

如果它涉及UPSERTS,那么可能你可以使用MYSQL库作为外部python库,并执行INSERT INTO ..... ON DUPLICATE KEY。

请参考此网址:How to use JDBC source to write and read data in (Py)Spark?

问候

座位


0
投票

我了解到这只能通过JDBC完成。例如。

df.write.format("jdbc") \
    .option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
    .option("dbtable", REDSHIFT_TABLE_NAME) \
    .option("tempdir", args["TempDir"]) \
    .option("forward_spark_s3_credentials", "true") \
    .mode("overwrite") \
    .save()
© www.soinside.com 2019 - 2024. All rights reserved.