关于spark的DBSCAN:哪个实现

问题描述 投票:7回答:4

我想在Spark上做一些DBSCAN。我目前发现了2个实现:

我已经使用其github中给出的sbt配置测试了第一个但是:

  • jar中的函数与doc或github上的源代码中的函数不同。例如,我在jar中找不到列车功能
  • 我设法使用fit函数(在jar中找到)运行测试,但是epsilon的错误配置(从小到大)将代码置于无限循环中。

代码:

val model = DBSCAN.fit(eps, minPoints, values, parallelism)

有人设法用第一个库做某事吗?

有人测试过第二个吗?

scala apache-spark cluster-analysis apache-spark-mllib dbscan
4个回答
9
投票

请尝试ELKI。由于这是Java,因此从Scala调用应该很容易。

ELKI经过了很好的优化,使用索引可以扩展到相当大的数据集。

我们尝试在我们的基准研究中包含其中一个Spark实现 - 但是内存耗尽(而且它是内存耗尽的唯一实现...... Spark和Mahout的k-means也是最慢的):

Hans-Peter Kriegel,Erich Schubert和Arthur Zimek。 The (black) art of runtime evaluation: Are we comparing algorithms or implementations? 在:知识和信息系统(KAIS)。 2016年,1-38

Neukirchen教授在本技术报告中对DBSCAN的并行实施进行了基准测试:

Helmut Neukirchen Survey and Performance Evaluation of DBSCAN Spatial Clustering Implementations for Big Data and High-Performance Computing Paradigms

显然他有一些Spark实现工作,但注意到:

结果是毁灭性的:Apache Spark的任何实现都没有接近HPC实现。特别是在较大(但仍然相当小)的数据集上,大多数数据集完全失败,甚至无法提供正确的结果。

早些时候:

在使用我们集群的所有可用内核的同时运行任何“Spark DBSCAN”实现时,我们遇到了内存不足异常。

(另外,“Spark DBSCAN”在928个核心上花了2406秒,ELKI在1个核心上用了997秒用于较小的基准测试 - 其他Spark实现也没有太好,特别是它没有返回正确的结果...... )

“DBSCAN on Spark”没有崩溃,但是返回了完全错误的集群。

虽然“DBSCAN on Spark”的完成速度更快,但它提供了完全错误的聚类结果。由于具有最大内核数量的Spark的DBSCAN实现已经无可救药地长时间运行,因此我们没有使用较少数量的内核执行测量。

您可以将double[][]数组包装为ELKI数据库:

// Adapter to load data from an existing array.
DatabaseConnection dbc = new ArrayAdapterDatabaseConnection(data);
// Create a database (which may contain multiple relations!)
Database db = new StaticArrayDatabase(dbc, null);
// Load the data into the database (do NOT forget to initialize...)
db.initialize();

// Squared Euclidean is faster than Euclidean.
Clustering<Model> c = new DBSCAN<NumberVector>(
  SquaredEuclideanDistanceFunction.STATIC, eps*eps, minpts).run(db);

for(Cluster<KMeansModel> clu : c.getAllClusters()) {
  // Process clusters
}

另请参阅:Java API example(特别是,如何将DBID映射回行索引)。为了获得更好的性能,将索引工厂(例如new CoverTree.Factory(...))作为第二个参数传递给StaticArrayDatabase构造函数。


3
投票

我在我的项目中成功使用了第二个库(https://github.com/alitouka/spark_dbscan)。实际上,我不能使用它如下:

libraryDependencies += "org.alitouka" % "spark_dbscan_2.10" % "0.0.4"

resolvers += "Aliaksei Litouka's repository" at "http://alitouka-public.s3-website-us-east-1.amazonaws.com/"

相反,我下载代码并将其更新为spark 2.2.1版本。此外,应添加一些库。最后,将代码添加到我的项目中,它可以工作!


1
投票

我测试了https://github.com/irvingc/dbscan-on-spark并且可以说它消耗了大量内存。对于具有平滑分布的400K数据集,我使用了-Xmx12084m,甚至在这种情况下它工作时间太长(> 20分钟)。另外,它只是2D。我用maven项目,而不是sbt。

我测试了第二个实现。这仍然是我发现的最好的。不幸的是,作者自2015年以来就不支持它。它确实需要一些时间来提升Spark的版本并解决版本冲突。我需要它在aws上部署。


0
投票

您还可以考虑使用smile,它提供了DBSCAN的实现。你必须以最直接的方式使用groupBymapGroupsflatMapGroups相结合,你可以在那里运行dbscan。这是一个例子:

  import smile.clustering._

  val dataset: Array[Array[Double]] = Array(
    Array(100, 100),
    Array(101, 100),
    Array(100, 101),
    Array(100, 100),
    Array(101, 100),
    Array(100, 101),

    Array(0, 0),
    Array(1, 0),
    Array(1, 2),
    Array(1, 1)
  )

  val dbscanResult = dbscan(dataset, minPts = 3, radius = 5)
  println(dbscanResult)

  // output
  DBSCAN clusters of 10 data points:
  0     6 (60.0%)
  1     4 (40.0%)
  Noise     0 ( 0.0%)

如果需要获得更多性能,还可以编写用户定义聚合函数(UDAF)。

我在工作中使用这种方法来对时间序列数据进行聚类,因此使用Spark的时间窗函数进行分组,然后能够在每个窗口中执行DBSCAN,这使我们能够并行化实现。

我受到以下article的启发才能做到这一点

© www.soinside.com 2019 - 2024. All rights reserved.