我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来执行此操作。我只想迁移最后 6 个月的数据,但是我的查询似乎正在执行整个相关表的加载,然后对其进行过滤,这会导致内存故障。这是我到目前为止的代码:
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()
filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())
有什么方法可以在加载查询上应用该过滤器吗?这条路径目前尝试
select * from table
,我希望它改为 select * from table where scandate > "2020-05-31"
我最终只使用了 AWS Database Migration Service。实际上非常无痛
我只能使用 AWS Glue 来实现它。
sample_query = f"SELECT f1, f2 FROM short_url_mapping WHERE f4= '{request_id}' AND"
print("sample_query: " + sample_query)
additional_options = {
"hashpartitions": 5,
"hashfield": "request_id",
"enablePartitioningForSampleQuery": True,
"sampleQuery": sample_query
}
postgres_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
database="database_name",
table_name="table_name",
additional_options=additional_options
)
您的示例查询必须以 AND 结尾,以便 AWS Glue 附加分区条件。
我遵循了此 AWS 文档 https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-pushdown.html
Glue:4.0,spark:3.3,Python:3