我正在使用Apache Spark ML LSH的roximateSimilarityJoin方法加入2个数据集,但是我看到一些奇怪的行为。
((内部)加入后,数据集有点偏斜,但是每次一个或多个任务要花费非常多的时间才能完成。
您可以看到,每个任务的中位数为6毫秒(我正在较小的源数据集上对其进行测试),但是1个任务需要10分钟。它几乎不使用任何CPU周期,它实际上是在联接数据,但是太慢了。下一个最慢的任务在14秒内运行,记录增加了4倍,实际上溢出到磁盘上。
联接本身是pos和hashValue(minhash)上的两个数据集之间的内部联接(根据minhash规范和udf来计算匹配对之间的jaccard距离。
分解哈希表:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Jaccard距离功能:
override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
1 - intersectionSize / unionSize
}
加入已处理的数据集:
// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
.drop(explodeCols: _*).distinct()
// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)
// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)
我已经尝试过将缓存,重新分区甚至启用spark.speculation
组合使用,但都无济于事。
数据由必须匹配的带状地址文本组成:53536, Evansville, WI
=>53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi
与城市或邮编中有错字的记录的距离将很短。
哪个会给出非常准确的结果,但可能是联接偏斜的原因。
我的问题是:
我也面临着同样的问题。您找到解决方案了吗?