各位开发者大家好,
我目前正在开发一个 PySpark 项目,我需要在两个大型数据帧之间执行联接。一个数据帧包含大约 1000 万条以短字符串作为关键字(2-5 个单词)的条目,而另一个数据帧包含 3000 万条包含变体(5-10 个单词字符串)、商家和计数的记录。
目标是根据第一个数据帧中的关键字包含在第二个数据帧的变体中的条件来连接数据帧。然而,当前代码在大型 EMR 集群上运行了 3 个多小时,但仍未完成。
EMR配置
5个任务节点:m5.16xlarge(每个节点32核/256GB) 主节点:m5.8xlarge(4核/64GB)
spark-提交命令:
时间spark-submit--masteryarn--deploy-mode客户端--confspark.yarn.maxAppAttempts=1--packagesorg.apache.hadoop:hadoop-aws:2.7.0--num-executors30--conf Spark.driver.memoryOverhead=6g --conf Spark.executor.memoryOverhead=6g --executor-cores 5 --executor-memory 42g --driver-memory g 42 --conf Spark.yarn.executor.memoryOverhead=409 join_code。 py
这是我正在使用的代码的简化版本:
# Code for join
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameJoin").getOrCreate()
# Loading dataframes
keywords_df = spark.read.parquet("keywords.parquet")
variations_df= spark.read.parquet("variations.parquet")
# Cross-joining based on keyword containment
result = keywords_df.join(variations_df,F.col(variations).contains(F.col(keyword)),how='left')
result.show()
如果模糊匹配花费了您大量时间,那么以下解决方案可能会有所帮助。
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
import rapidfuzz
sc = SparkContext('local')
sqlContext = SQLContext(sc)
keyword_given = [
["green pstr",],
["greenpstr",],
["wlmrt", ],
["walmart",],
["walmart super",]
]
keywordColumns = ["keyword"]
keyword_df = sqlContext.createDataFrame(data=keyword_given, schema = keywordColumns)
print("keyword_df dataframe")
keyword_df.show(truncate=False)
variations = [
("type green pstr", "ABC", 100),
("type green pstr","PQR",200),
("type green pstr", "NZSD", 2999),
("wlmrt payment","walmart",200),
("wlmrt solutions", "walmart", 200),
("nppssdwlmrt", "walmart", 2000)
]
variationsColumns = ["variations", "entity", "ID"]
variations_df = sqlContext.createDataFrame(data=variations, schema = variationsColumns)
print("variations_df dataframe")
variations_df.show(truncate=False)
def evalutate_helper_spark(keyw, var):
return rapidfuzz.fuzz.partial_ratio(keyw, var)
calculate_ratio = udf(lambda keyw, var : evalutate_helper_spark(keyw, var))
middle_df = variations_df.crossJoin(keyword_df)
middle_df = middle_df.withColumn("partial_ratio", calculate_ratio(F.col("keyword"), F.col("variations")))
print("middle df show")
middle_df.show(n=100, truncate=False)
这是输出:
keyword_df dataframe
+-------------+
|keyword |
+-------------+
|green pstr |
|greenpstr |
|wlmrt |
|walmart |
|walmart super|
+-------------+
variations_df dataframe
+---------------+-------+----+
|variations |entity |ID |
+---------------+-------+----+
|type green pstr|ABC |100 |
|type green pstr|PQR |200 |
|type green pstr|NZSD |2999|
|wlmrt payment |walmart|200 |
|wlmrt solutions|walmart|200 |
|nppssdwlmrt |walmart|2000|
+---------------+-------+----+
middle df show
+---------------+-------+----+-------------+------------------+
|variations |entity |ID |keyword |partial_ratio |
+---------------+-------+----+-------------+------------------+
|type green pstr|ABC |100 |green pstr |100.0 |
|type green pstr|ABC |100 |greenpstr |88.88888888888889 |
|type green pstr|ABC |100 |wlmrt |33.333333333333336|
|type green pstr|ABC |100 |walmart |25.0 |
|type green pstr|ABC |100 |walmart super|40.0 |
|type green pstr|PQR |200 |green pstr |100.0 |
|type green pstr|PQR |200 |greenpstr |88.88888888888889 |
|type green pstr|PQR |200 |wlmrt |33.333333333333336|
|type green pstr|PQR |200 |walmart |25.0 |
|type green pstr|PQR |200 |walmart super|40.0 |
|type green pstr|NZSD |2999|green pstr |100.0 |
|type green pstr|NZSD |2999|greenpstr |88.88888888888889 |
|type green pstr|NZSD |2999|wlmrt |33.333333333333336|
|type green pstr|NZSD |2999|walmart |25.0 |
|type green pstr|NZSD |2999|walmart super|40.0 |
|wlmrt payment |walmart|200 |green pstr |46.15384615384615 |
|wlmrt payment |walmart|200 |greenpstr |50.0 |
|wlmrt payment |walmart|200 |wlmrt |100.0 |
|wlmrt payment |walmart|200 |walmart |83.33333333333334 |
|wlmrt payment |walmart|200 |walmart super|70.0 |
|wlmrt solutions|walmart|200 |green pstr |40.0 |
|wlmrt solutions|walmart|200 |greenpstr |36.36363636363637 |
|wlmrt solutions|walmart|200 |wlmrt |100.0 |
|wlmrt solutions|walmart|200 |walmart |83.33333333333334 |
|wlmrt solutions|walmart|200 |walmart super|70.0 |
|nppssdwlmrt |walmart|2000|green pstr |42.85714285714286 |
|nppssdwlmrt |walmart|2000|greenpstr |46.15384615384615 |
|nppssdwlmrt |walmart|2000|wlmrt |100.0 |
|nppssdwlmrt |walmart|2000|walmart |83.33333333333334 |
|nppssdwlmrt |walmart|2000|walmart super|55.55555555555556 |
+---------------+-------+----+-------------+------------------+