这是我的场景。
我有一个源数据库,它是带有 Postgresql 引擎的 aurora 数据库。我有一张名为
payments
的表。该表由数百万条记录组成,因此每天结束时我需要读取数据并检查是否有任何逾期付款,如果有任何付款逾期,我需要将付款标记为“逾期” .
如何使用 AWSgluespark 作业以优化的方式实现这一目标?可以利用哪些 AWS 组件来实现此要求?
提前致谢!
您需要使用 AWS Glue 从您的 Aurora PostgreSQL
payments
表中提取数据。aws-samples/aws-glue-samples
中有很多示例,并且您还可以在 DataFrame 中包含 DataDirect JDBC 驱动程序。
Aurora DB (PostgreSQL) ──┐
│
├─> AWS Glue (ETL) ──> Process ──> S3 (Temporary Storage)
│
└─<──────────────────<──────────<─ AWS Lambda & RDS Data API
从那里,使用 Spark 过滤记录以识别逾期付款并相应地标记它们。将处理后的数据临时保存在 Amazon S3 存储桶中,并使用 AWS Lambda 以及 RDS Data API(Aurora 数据库集群的 Web 服务接口)根据 S3 中的数据更新原始 payments
表.
import sys
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import current_date, col, lit
from pyspark.sql import SparkSession
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Reading data from Aurora
payments_df = spark.read.format("jdbc",
url="jdbc:postgresql://<your-aurora-endpoint>:5432/yourdatabase",
dbtable="payments",
properties={"user": "yourusername", "password": "yourpassword"})
# Identify overdue payments
overdue_payments_df = payments_df.filter(col("due_date") < current_date())
# Mark payments as Overdue
overdue_payments_marked_df = overdue_payments_df.withColumn("status", lit("Overdue"))
# Save processed data to S3
overdue_payments_marked_df.write.mode("overwrite").parquet("s3://your-bucket-name/overdue_payments/")
使用 AWS Lambda,手动或按计划触发(通过 Amazon EventBridge,以前称为 CloudWatch Events),从 S3 读取处理后的数据并使用 RDS 数据 API 更新 Aurora 数据库:
import boto3
import pandas as pd
import s3fs
def lambda_handler(event, context):
# Access the processed data in S3
fs = s3fs.S3FileSystem(anon=False)
with fs.open('s3://your-bucket-name/overdue_payments/', mode='rb') as f:
overdue_payments_df = pd.read_parquet(f)
# Convert your DataFrame to SQL update statements
# Execute SQL updates via RDS Data API
rds_client = boto3.client('rds-data')
# Example: update a single record
rds_client.execute_statement(
resourceArn='arn:aws:rds:region:account-id:cluster:your-cluster-id',
secretArn='arn:aws:secretsmanager:region:account-id:secret:your-secret-name',
database='yourdatabase',
sql='UPDATE payments SET status = "Overdue" WHERE id = ?;',
parameters=[{'name':'id', 'value':{'longValue': 123}}]
)