使用spark数据帧广播哈希联接

问题描述 投票:1回答:1

我试图在Spark 1.6.0中进行广播散列连接,但无法成功。以下是示例:

val DF1 = sqlContext.read.parquet("path1")

val DF2 = sqlContext.read.parquet("path2")


val Join = DF1.as("tc").join(broadcast(DF2.as("st")), Seq("col1"), "left_outer")

即使我使用广播提示,DF上的解释显示SortMergeOuterJoin。我认为其中一个原因是DF2大于20MB,默认属性spark.sql.autoBroadcastJoinThreshold是10 MB,但我无法在spark-shell中更改此变量的属性。我做错了什么。

我尝试如下

spark.sql.autoBroadcastJoinThreshold = 100MB

scala> spark.sql.autoBroadcastJoinThreshold=100MB
<console>:1: error: Invalid literal number
       spark.sql.autoBroadcastJoinThreshold=100MB

我需要设置此属性并尝试我是否可以执行广播散列连接并且这样做可以改善任何性能。我在stackoverflow上检查了很多线程但是没有成功。任何人都可以帮助我

scala apache-spark spark-dataframe
1个回答
3
投票

尝试执行以下操作:

编辑:这是Scala代码,Python代码如下

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res1: String = 10485760

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "20971520")

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res3: String = 20971520

Python代码:如果我的内存对我有好处,那么无论何时传递SparkConf对象,它都会被克隆,因此您无法在上下文中更改它,但您可以在会话中进行更改。

首先,我检查当前大小的阈值,实际上它是10 Mb

>>> spark.conf.get('spark.sql.autoBroadcastJoinThreshold')
u'10485760'

现在我创建一个新的会话,不要担心DataFrames(是啊......数据集[Row])你可以有多个会话

spark_new = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","20971520").getOrCreate()

然后我确认已设置新的配置值

>>> spark_new.conf.get('spark.sql.autoBroadcastJoinThreshold')
u'20971520'

你去,加倍大小

注意:我使用Python,但只是在几个语法糖差异中添加一个val,你应该没问题。希望它能帮助或引导您朝着正确的方向前进

© www.soinside.com 2019 - 2024. All rights reserved.