在 AWS GLUE pyspark 中运行自定义 MYSQL 查询

问题描述 投票:0回答:2

我有一个用例,在 mysql 表中插入一行后,我想取回最后插入的行的 id。我想在 awsglue 中运行 MYSQL 查询以实现相同的目的。我是 pyspark 的新手,无法找到相同的内容。这是连接到数据库的代码。 我的自定义查询

INSERT INTO users (name,email) VALUES ('abc1234','[email protected]');
SELECT LAST_INSERT_ID();
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Create a SparkContext and GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Define your RDS database endpoint, username, and password
source_host = "xxx"
source_db = "xxx"
source_port = "3306"
source_driver = "jdbc"
source_db_type = "mysql"
source_user = "xxx"
source_password = "xxx"

# Define the JDBC URL
source_connection_url = f'{source_driver}:{source_db_type}://{source_host}:{source_port}/{source_db}'
source_table = "users"
source_connection_mysql5_options = {
    "url": source_connection_url,
    "dbtable": source_table,
    "user": source_user,
    "password": source_password
}
source_dynamic_frame = glueContext.create_dynamic_frame.from_options(
                        connection_type="mysql",connection_options=source_connection_mysql5_options)
source_dynamic_frame.printSchema().
amazon-web-services pyspark aws-glue
2个回答
0
投票

使用 mysql ucan 使用 AUTO_INCRMENT 创建 id

CREATE TABLE table_name (
 id MEDIUMINT NOT NULL AUTO_INCREMENT,
 name CHAR(30) NOT NULL,
 PRIMARY KEY (id));

然后就可以使用dynamicframe的filter()了:

# Filter records based on the maximum value of a column
max_value = source_dynamic_frame.max('id')
filtered_dynamic_frame = dynamic_frame.filter(lambda x: x['id'] == max_value)

0
投票

解决方案1:

检查此:定义连接选项

您可以添加

sampleQuery
属性,您可以在其中添加自定义查询。

解决方案2:(待检查)

例如,您可以在

dbtable
中指定您的查询

source_connection_mysql_options = {
  "url": source_connection_url,
  "dbtable": "(SELECT LAST_INSERT_ID()) AS lastInserted",
  "user": source_user,
  "password": source_password
}

解决方案3:

根据您将在 Glue 中用作源的自定义查询创建 Mysql 视图

© www.soinside.com 2019 - 2024. All rights reserved.