最佳MapReduce算法,用于计算每个重叠间隔的数量

问题描述 投票:0回答:2

[a, b]格式中有数十亿个区间,所有这些区间都将数字空间分成多个单个区域。我打算在这件作品中输出所有单件,重叠间隔的数量。

例如:有3个区间,即:[1,7],[2,3],[6,8]。它应该输出结果如下:

[-∞, 1]: 0

[1, 2]: 1

[2, 3]: 2

[3, 6]: 1

[6, 7]: 2

[7, 8]: 1

[8, +∞]: 0

如果对于单个机器(不是MapReduce中的分布式解决方案),我知道解决方案可以将区间实例分解为start_nend_n,按数字排序并从左向右迭代并使用计数器计算金额当前的片断和输出。但我不确定如何将此算法拆分为分布式方式。

有什么建议?谢谢。

java hadoop apache-spark mapreduce distributed-computing
2个回答
1
投票

在mapreduce中,最简单的方法是将对中的每个数字写入reducer。排序洗牌阶段负责排序数量和减速器将负责修复。

例如对于输入对[1,7],Mapper输出将是:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7

使用相同的模式,所有映射器的输出形式将是:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7
key: NullWritable  Value: 2
key: NullWritable  Value: 3
key: NullWritable  Value: 2_3
key: NullWritable  Value: 6
key: NullWritable  Value: 8
key: NullWritable  Value: 6_8

sort-shuffle步骤将输出聚合为

Key: NullWritable  ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]

Reducer遍历值列表(这将是一个有序列表)和

  • 将对值分隔为单独的列表[1_7, 2_3, 6_8]。您可以在文本中检查_的出现以找出该对。
  • 重新配对空格值,如下所示。

[-infinity, 1] [1, 2] [2, 3] [3, 6] [6, 7] [7, 8] [8, +infinity]

  • 重新配对时,只需检查上面列表的边界即可找到计数。您可以使用“_”拆分对,并通过parse函数转换为数字。

例如-infinity(比如一个非常大的负长-9999999)超出所有对范围,因此减速器输出将是

key:“[ - infinity,1]”(Text Type)value: 0 (IntWritable`类型)

同样对于对[1,2]1>=1 and 2<=7所以减速器输出

key:“[1,2]”(Text类型)value: 1 (IntWritable`类型)

对于[6,7]6>=1 and 7<=76>=6 and 7<=8这样的减速器输出

key:“[1,2]”(Text类型)value: 2 (IntWritable`类型)

等等...

注意:NullWritableJava hadoop API,代表null。您可以使用任何常数数据(比如NullWritable类型Hadoop Text)而不是Writable。这里的要点是确保所有映射器输出都应该由于相同的映射器键而降落到单个reducer。


0
投票

下面是一个有效的Spark代码(至少在你的例子中它给出了正确的结果:

由于2种笛卡儿产品,代码效率不高。

间隔比较的条件可能需要一些注意:)

请随意改进代码,并在此处发布您的改进答案。

import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
  val sc = new SparkContext(conf)

  case class Interval(start : Double, end : Double)

  def main(args: Array[String]): Unit = {

sc.setLogLevel("ERROR")

val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)

// Get unique flat boundary values  e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)

// Calculate all intervals
val intervals = all_boundaries
  // For each interval start get all possible interval ends
  .cartesian(all_boundaries)    // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
  // Filter out invalid intervals (where begin is either less or equal to the end)  e.g.: from previous comment (2, 1) is invalid interval
  .filter(v => v._1 < v._2)     // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
  // Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
  .reduceByKey((a, b) => Math.min(a, b))  // [(1, 2) (2, 3), ...]

// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)

// Get counts of overlapping intervals
val countsPerInterval = intervals
  // for each small interval get all possible intput intervals e.g.:
  .cartesian(inputRdd)    // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Filter out intervals that do not overlap
  .filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2}   // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
  .mapValues(_ => 1)      //[((1, 2), 1), ((1, 2), 1), ...]
  // Calculate a sum per interval
  .reduceByKey(_ + _)   // [((1, 2), 2), ...]

// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
  }

}
© www.soinside.com 2019 - 2024. All rights reserved.