使用 Spark 引擎的 AWS Glue 批量处理

问题描述 投票:0回答:1

这是我的场景。

我有一个源数据库,它是带有 Postgresql 引擎的 aurora 数据库。我有一张名为

payments
的表。该表由数百万条记录组成,因此每天结束时我需要读取数据并检查是否有任何逾期付款,如果有任何付款逾期,我需要将付款标记为“逾期” .

如何使用 AWSgluespark 作业以优化的方式实现这一目标?可以利用哪些 AWS 组件来实现此要求?

提前致谢!

apache-spark amazon-s3 pyspark aws-glue batch-processing
1个回答
0
投票

您需要使用 AWS Glue 从您的 Aurora PostgreSQL

payments
表中提取数据。
awsglue Python 包 应该会有所帮助:您在 aws-samples/aws-glue-samples
 中有很多示例,并且您还可以在 DataFrame 中包含 
DataDirect JDBC 驱动程序

Aurora DB (PostgreSQL) ──┐ │ ├─> AWS Glue (ETL) ──> Process ──> S3 (Temporary Storage) │ └─<──────────────────<──────────<─ AWS Lambda & RDS Data API
从那里,使用 Spark 过滤记录以识别逾期付款并相应地标记它们。将处理后的数据临时保存在 Amazon S3 存储桶中,并使用 

AWS Lambda 以及 RDS Data API(Aurora 数据库集群的 Web 服务接口)根据 S3 中的数据更新原始 payments

 表.

import sys from awsglue.context import GlueContext from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.functions import current_date, col, lit from pyspark.sql import SparkSession args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Reading data from Aurora payments_df = spark.read.format("jdbc", url="jdbc:postgresql://<your-aurora-endpoint>:5432/yourdatabase", dbtable="payments", properties={"user": "yourusername", "password": "yourpassword"}) # Identify overdue payments overdue_payments_df = payments_df.filter(col("due_date") < current_date()) # Mark payments as Overdue overdue_payments_marked_df = overdue_payments_df.withColumn("status", lit("Overdue")) # Save processed data to S3 overdue_payments_marked_df.write.mode("overwrite").parquet("s3://your-bucket-name/overdue_payments/")
使用 AWS Lambda,手动或按计划触发(通过 

Amazon EventBridge,以前称为 CloudWatch Events),从 S3 读取处理后的数据并使用 RDS 数据 API 更新 Aurora 数据库:

import boto3 import pandas as pd import s3fs def lambda_handler(event, context): # Access the processed data in S3 fs = s3fs.S3FileSystem(anon=False) with fs.open('s3://your-bucket-name/overdue_payments/', mode='rb') as f: overdue_payments_df = pd.read_parquet(f) # Convert your DataFrame to SQL update statements # Execute SQL updates via RDS Data API rds_client = boto3.client('rds-data') # Example: update a single record rds_client.execute_statement( resourceArn='arn:aws:rds:region:account-id:cluster:your-cluster-id', secretArn='arn:aws:secretsmanager:region:account-id:secret:your-secret-name', database='yourdatabase', sql='UPDATE payments SET status = "Overdue" WHERE id = ?;', parameters=[{'name':'id', 'value':{'longValue': 123}}] )
    
© www.soinside.com 2019 - 2024. All rights reserved.