有没有办法手动设置RDD分区的首选位置?我想确保在某台机器上计算某些分区。
我正在使用数组和'Parallelize'方法从中创建一个RDD。
另外我不使用HDFS,文件在本地磁盘上。这就是我想修改执行节点的原因。
有没有办法手动设置RDD分区的preferredLocations?
是的,有,但它是RDD特定的,因此不同种类的RDD有不同的方法来做到这一点。
Spark使用RDD.preferredLocations
获取计算每个分区/拆分的首选位置列表(例如,HDFS文件的块位置)。
final def preferredLocations(split:Partition):Seq [String]
获取分区的首选位置,同时考虑RDD是否为检查点。
如你所见,方法是final
,这意味着没有人可以覆盖它。
当你看到source code的RDD.preferredLocations
时,你会看到RDD如何知道它的首选位置。它使用受保护的RDD.getPreferredLocations方法,自定义RDD可以(但不必)覆盖以指定放置首选项。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
所以,现在问题已经“变形”为另一个关于什么是允许设置其首选位置的RDD的问题。找到你的,看看源代码。
我正在使用数组和'Parallelize'方法从中创建一个RDD。
如果你parallelize
你的本地数据集它不再是分布式的,可以是这样的,但是......为什么你要将Spark用于你可以在一台计算机/节点上本地处理的东西?
如果你坚持并且确实想要将Spark用于本地数据集,那么SparkContext.parallelize
背后的RDD就是......让我们来看看源代码... ParallelCollectionRDD does allow for location preferences。
让我们将你的问题改写为以下内容(希望我不会失去任何重要的事实):
什么是允许创建
ParallelCollectionRDD
并明确指定位置首选项的运算符?
令我惊讶的是(因为我不知道该功能),有一个这样的运算符,即SparkContext.makeRDD,...接受每个对象的一个或多个位置首选项(Spark节点的主机名)。
makeRDD [T](seq:Seq [(T,Seq [String])]):RDD [T]分配本地Scala集合以形成RDD,每个对象具有一个或多个位置首选项(Spark节点的主机名)。为每个集合项创建一个新分区。
换句话说,不是使用parallelise
你必须使用makeRDD
(在Scala的Spark Core API中可用,但我不确定我将离开作为家庭练习的Python :))
我正在应用于创建某种RDD的任何其他RDD运算符/转换的相同推理。