假设我有一个1000,000行的数据集。我将其分为400行。我将它们转换为具有1000,000 / 400 = 2500 rdd
元素的rdd,以便将400行水平连接。
现在我的问题是我可以在rdd的map操作中调用任意函数。
rdd.map(ele => {
someFunction(ele)
})
def someFunction(){
step 1 : restuctures the 400 rows back to their original format (somehow magically)
step 2 : call a function like knn(x, y, 3) (nearest neighbour classifier) with that restructed data
step 3 : return the result
}
knn函数来自此处的SMILE库。
这可能吗?是推荐的方法吗?它有什么陷阱?有什么想法吗?
我这样做的原因是因为Spark仅具有内置的LinearSVM分类器,而没有其他基于内核的分类器。因此,希望在跳完篮球之后,我应该能够利用Spark分布式地调用内核SVM分类器
这可能吗?是
您甚至可以将完整的1000000加载到rdd中。最简单的方法是先使用dataframe api加载它们。然后,您可以将数据重新划分为2500个分区,并在完整的rdd上使用mapPartitions。它将以rdd发出每个分区,您可以减少到3个最接近的关键邻居。
因此正确的方法是使用reduce,而不是在rdd上进行映射。 mapPartition函数的结果必须是rdd本身。
但是用map / reduce方式写knn可能很棘手。 mapPartitions中的代码在单个Taskrunner上运行。因此,您可以在函数内使用全局状态。只要代码本身返回rdd,该策略就可以[]
例如
val x = sc.broadcast("some value") df = spark.read.csv("s3://path/to/data/*") # can be multiple csv chunks with same format df = df.repartition(2500) def processPartition(rdd): knn = [] for elem in rdd: if isNearNeighbor(elem, x.value): knn.append(elem) return sc.parallelize(knn) # or similar rdd creation funcion top3KnnPerPartitionRdd = df.toRdd.mapPartitions(processPartition) # now top3KnnPerPartitionRdd can be reduced again to top3 with a reduce function
这是推荐的方法吗?
我不知道这是否是编写分布式knn的好方法,因为我从不打扰内部如何实现算法。虽然有spark knn 3rd party软件包。我们使用梯度提升算法作为分类器,而不是knn。 xgboost软件包的使用范围非常广,维护良好,并且在kaggle挑战中大肆宣传,其表现优于其他几个分类器。
[还有另一个较新的api,适用于split-combine-apply模式。Pandas UDF它将允许您使用更新的数据框api。
您仍然像上面一样将所有记录加载到数据框中。然后拿出一个groupby
进行分组,形成400条记录。然后在每个组上运行一个熊猫udf。 udf将以熊猫数据框的形式接收400条记录。您可以在此熊猫框架上自由使用任何python knn软件包。同样,您必须返回一个熊猫数据框。
@pandas_udf("element double", PandasUDFType.GROUPED_MAP)
def knn(pdf):
return NearestNeighbors(pdf, x.value)
df = df.withColumn("random", round(rand()*400,0))
df.groupby("random").apply(knn).show()