我正在寻找一个批量加载器,用于将胶水作业加载到RDS,使用PySpark脚本和DataFormatWriter。我有这个为RedShift工作如下:
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcconf.get("url") + '/' + DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
.option("dbtable", TABLE_NAME) \
.option("tempdir", args["TempDir"]) \
.option("forward_spark_s3_credentials", "true") \
.mode("overwrite") \
.save()
上面定义了df
以读取文件。在RDS中而不是在REDSHIFT中,我可以采取的最佳方法是什么?
在RDS中,您只是APPEND / OVERWRITE,在这种情况下,您可以创建RDS JDBC连接,并使用如下所示的内容:
postgres_url="jdbc:postgresql://localhost:portnum/sakila?user=<user>&password=<pwd>"
df.write.jdbc(postgres_url,table="actor1",mode="append") #for append
df.write.jdbc(postgres_url,table="actor1",mode="overwrite") #for overwrite
如果它涉及UPSERTS,那么可能你可以使用MYSQL库作为外部python库,并执行INSERT INTO ..... ON DUPLICATE KEY。
请参考此网址:How to use JDBC source to write and read data in (Py)Spark?
问候
座位
我了解到这只能通过JDBC完成。例如。
df.write.format("jdbc") \
.option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
.option("dbtable", REDSHIFT_TABLE_NAME) \
.option("tempdir", args["TempDir"]) \
.option("forward_spark_s3_credentials", "true") \
.mode("overwrite") \
.save()