我有一个用例,在 mysql 表中插入一行后,我想取回最后插入的行的 id。我想在 awsglue 中运行 MYSQL 查询以实现相同的目的。我是 pyspark 的新手,无法找到相同的内容。这是连接到数据库的代码。 我的自定义查询
INSERT INTO users (name,email) VALUES ('abc1234','[email protected]');
SELECT LAST_INSERT_ID();
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Create a SparkContext and GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Define your RDS database endpoint, username, and password
source_host = "xxx"
source_db = "xxx"
source_port = "3306"
source_driver = "jdbc"
source_db_type = "mysql"
source_user = "xxx"
source_password = "xxx"
# Define the JDBC URL
source_connection_url = f'{source_driver}:{source_db_type}://{source_host}:{source_port}/{source_db}'
source_table = "users"
source_connection_mysql5_options = {
"url": source_connection_url,
"dbtable": source_table,
"user": source_user,
"password": source_password
}
source_dynamic_frame = glueContext.create_dynamic_frame.from_options(
connection_type="mysql",connection_options=source_connection_mysql5_options)
source_dynamic_frame.printSchema().
使用 mysql ucan 使用 AUTO_INCRMENT 创建 id
CREATE TABLE table_name (
id MEDIUMINT NOT NULL AUTO_INCREMENT,
name CHAR(30) NOT NULL,
PRIMARY KEY (id));
然后就可以使用dynamicframe的filter()了:
# Filter records based on the maximum value of a column
max_value = source_dynamic_frame.max('id')
filtered_dynamic_frame = dynamic_frame.filter(lambda x: x['id'] == max_value)
解决方案1:
检查此:定义连接选项
您可以添加
sampleQuery
属性,您可以在其中添加自定义查询。
解决方案2:(待检查)
例如,您可以在
dbtable
中指定您的查询
source_connection_mysql_options = {
"url": source_connection_url,
"dbtable": "(SELECT LAST_INSERT_ID()) AS lastInserted",
"user": source_user,
"password": source_password
}
解决方案3:
根据您将在 Glue 中用作源的自定义查询创建 Mysql 视图