Pyspark 在从 Postgres 加载之前过滤结果(不要先加载整个表)

问题描述 投票:0回答:2

我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来执行此操作。我只想迁移最后 6 个月的数据,但是我的查询似乎正在执行整个相关表的加载,然后对其进行过滤,这会导致内存故障。这是我到目前为止的代码:

from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext

sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()

filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())

有什么方法可以在加载查询上应用该过滤器吗?这条路径目前尝试

select * from table
,我希望它改为
select * from table where scandate > "2020-05-31"

python postgresql apache-spark pyspark aws-glue
2个回答
0
投票

我最终只使用了 AWS Database Migration Service。实际上非常无痛


0
投票

我只能使用 AWS Glue 来实现它。

sample_query = f"SELECT f1, f2 FROM short_url_mapping WHERE f4= '{request_id}' AND"
print("sample_query: " + sample_query)

additional_options = {
    "hashpartitions": 5,
    "hashfield": "request_id",
    "enablePartitioningForSampleQuery": True,
    "sampleQuery": sample_query
}

postgres_dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database="database_name",
    table_name="table_name",
    additional_options=additional_options
)

您的示例查询必须以 AND 结尾,以便 AWS Glue 附加分区条件。

我遵循了此 AWS 文档 https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-pushdown.html

Glue:4.0,spark:3.3,Python:3

© www.soinside.com 2019 - 2024. All rights reserved.