我可以在spark中的RDD.map操作中调用任意函数吗?

问题描述 投票:0回答:1

假设我有一个1000,000行的数据集。我将其分为400行。我将它们转换为具有1000,000 / 400 = 2500 rdd元素的rdd,以便将400行水平连接。

现在我的问题是我可以在rdd的map操作中调用任意函数。

rdd.map(ele => {
someFunction(ele)
}) 

def someFunction(){
   step 1 : restuctures the 400 rows back to their original format (somehow magically)
   step 2 : call a function like knn(x, y, 3) (nearest neighbour classifier) with that restructed data
   step 3 : return the result
}

knn函数来自此处的SMILE库。

这可能吗?是推荐的方法吗?它有什么陷阱?有什么想法吗?

我这样做的原因是因为Spark仅具有内置的LinearSVM分类器,而没有其他基于内核的分类器。因此,希望在跳完篮球之后,我应该能够利用Spark分布式地调用内核SVM分类器

scala apache-spark distributed-computing
1个回答
0
投票

这可能吗?是

您甚至可以将完整的1000000加载到rdd中。最简单的方法是先使用dataframe api加载它们。然后,您可以将数据重新划分为2500个分区,并在完整的rdd上使用mapPartitions。它将以rdd发出每个分区,您可以减少到3个最接近的关键邻居。

因此正确的方法是使用reduce,而不是在rdd上进行映射。 mapPartition函数的结果必须是rdd本身。

但是用map / reduce方式写knn可能很棘手。 mapPartitions中的代码在单个Taskrunner上运行。因此,您可以在函数内使用全局状态。只要代码本身返回rdd,该策略就可以[]

例如

val x = sc.broadcast("some value") 
df = spark.read.csv("s3://path/to/data/*") # can be multiple csv chunks with same format
df = df.repartition(2500)

def processPartition(rdd):
   knn = []
   for elem in rdd:
      if isNearNeighbor(elem, x.value):
         knn.append(elem)
   return sc.parallelize(knn) # or similar rdd creation funcion


top3KnnPerPartitionRdd = df.toRdd.mapPartitions(processPartition)
# now top3KnnPerPartitionRdd can be reduced again to top3 with a reduce function

这是推荐的方法吗?

我不知道这是否是编写分布式knn的好方法,因为我从不打扰内部如何实现算法。虽然有spark knn 3rd party软件包。我们使用梯度提升算法作为分类器,而不是knn。 xgboost软件包的使用范围非常广,维护良好,并且在kaggle挑战中大肆宣传,其表现优于其他几个分类器。

[还有另一个较新的api,适用于split-combine-apply模式。Pandas UDF它将允许您使用更新的数据框api。

您仍然像上面一样将所有记录加载到数据框中。然后拿出一个groupby进行分组,形成400条记录。然后在每个组上运行一个熊猫udf。 udf将以熊猫数据框的形式接收400条记录。您可以在此熊猫框架上自由使用任何python knn软件包。同样,您必须返回一个熊猫数据框。

@pandas_udf("element double", PandasUDFType.GROUPED_MAP)
def knn(pdf):
   return NearestNeighbors(pdf, x.value)

df = df.withColumn("random", round(rand()*400,0))
df.groupby("random").apply(knn).show()
© www.soinside.com 2019 - 2024. All rights reserved.