将数据从Kinesis -> RDS中移动到本地使用Spark与AWS Glue实现。

问题描述 投票:0回答:1

我有一个Spark项目,AWS Glue实现在本地运行。

我监听一个Kinesis流,所以当数据以JSON格式到达时,我可以正确地存储到S3.我想存储在AWS RDS而不是存储在S3。

我已经尝试使用。

dataFrame.write
          .format("jdbc")
          .option("url","jdbc:mysql://aurora.cluster.region.rds.amazonaws.com:3306/database")
          .option("user","user")
          .option("password","password")
          .option("dbtable","test-table")
          .option("driver","com.mysql.jdbc.Driver")
          .save()

Spark项目从Kinesis流中获取数据,使用AWS胶水作业。

我想把数据添加到Aurora数据库。

它的失败与错误

Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL
 server version for the right syntax to use near '-glue-table (`label2` TEXT , `customerid` TEXT , `sales` TEXT , `name` TEXT )' a
t line 1

这是我使用的测试数据帧。dataFrame.show():

+------+----------+-----+--------------------+
|label2|customerid|sales|                name|
+------+----------+-----+--------------------+
| test6|      test| test|streamingtesttest...|
+------+----------+-----+--------------------+
apache-spark aws-glue amazon-rds-aurora
1个回答
0
投票

使用Spark DynamicFrame代替DataFrame,并使用glueContext sink发布到Aurora。

所以最终的代码可以是:

lazy val mysqlJsonOption = jsonOptions(MYSQL_AURORA_URI)

//Write to Aurora
val dynamicFrame = DynamicFrame(joined, glueContext)
glueContext.getSink("mysql", mysqlJsonOption).writeDynamicFrame(dynamicFrame)
© www.soinside.com 2019 - 2024. All rights reserved.