KAs
KAs

Reputation: 1868

Best MapReduce Algorithm to calculate the number of every single overlapped intervals

There're billions of intervals in format [a, b], and all of them will cut the number space into multiple single pieces. I intend to output all single pieces with the number of overlapped intervals within this piece.

For instance: there are 3 intervals, namely: [1,7], [2,3], [6, 8]. It should output result as below:

[-∞, 1]: 0

[1, 2]: 1

[2, 3]: 2

[3, 6]: 1

[6, 7]: 2

[7, 8]: 1

[8, +∞]: 0

If for a single machine (not a distributed solution as in MapReduce), I know the solution could be break down the interval instance into start_n, end_n, sort by the number and iterate from left to right and use a counter to count the amount in current piece and output. But I'm not sure how this algorithm could be splitted into a distributed way.

Any suggestions? Thanks.

Upvotes: 0

Views: 201

Answers (2)

Gyanendra Dwivedi
Gyanendra Dwivedi

Reputation: 5547

In mapreduce, the simplest approach would be to write each of the number in the pair to the reducer. The sort shuffle phase takes care of the sorting the number and reducer would take care of repairing.

e.g. for the input pair [1,7] the Mapper output would be:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7

With the same pattern, the output form all mapper would be:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7
key: NullWritable  Value: 2
key: NullWritable  Value: 3
key: NullWritable  Value: 2_3
key: NullWritable  Value: 6
key: NullWritable  Value: 8
key: NullWritable  Value: 6_8

The sort-shuffle step would aggregate the output as

Key: NullWritable  ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]

The Reducer iterate through the list of values (which would be an ordered list) and

  • Segregate the pair values into a separate list [1_7, 2_3, 6_8]. You may just check for occurrence of _ in the text to figure out the pair.

  • Re-pair the space values as below.

[-infinity, 1] [1, 2] [2, 3] [3, 6] [6, 7] [7, 8] [8, +infinity]

  • When re-pairing it, just check the boundaries against the above list to find the count. You may split the pair with "_" and convert into number via parse functions.

e.g. -infinity (say a very big negative long -9999999) is out of all the pair range, hence the reducer output will be

key:"[-infinity, 1]" (Text Type)value: 0 (IntWritable` type)

Similarly for pair [1,2], 1>=1 and 2<=7 so reducer output

key:"[1, 2]" (Text Type)value: 1 (IntWritable` type)

For pair [6,7], 6>=1 and 7<=7 and 6>=6 and 7<=8 so reducer output

key:"[1, 2]" (Text Type)value: 2 (IntWritable` type)

and so on...

Note: NullWritable is a Java hadoop API, which represents just null. Instead of NullWritable, you may use any constant data (say a Hadoop Text type Writable). The main point here is to make sure that all the mapper output should land to single reducer due to same mapper key.

Upvotes: 1

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

Below is a working Spark code (at least with your example it gives correct result:

Code is not very efficient due to 2 cartesian products.

Also the condition for interval comparison might require some attention :)

Feel free to improve the code and post your improved answer here.

import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
  val sc = new SparkContext(conf)

  case class Interval(start : Double, end : Double)

  def main(args: Array[String]): Unit = {

sc.setLogLevel("ERROR")

val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)

// Get unique flat boundary values  e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)

// Calculate all intervals
val intervals = all_boundaries
  // For each interval start get all possible interval ends
  .cartesian(all_boundaries)    // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
  // Filter out invalid intervals (where begin is either less or equal to the end)  e.g.: from previous comment (2, 1) is invalid interval
  .filter(v => v._1 < v._2)     // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
  // Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
  .reduceByKey((a, b) => Math.min(a, b))  // [(1, 2) (2, 3), ...]

// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)

// Get counts of overlapping intervals
val countsPerInterval = intervals
  // for each small interval get all possible intput intervals e.g.:
  .cartesian(inputRdd)    // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Filter out intervals that do not overlap
  .filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2}   // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
  .mapValues(_ => 1)      //[((1, 2), 1), ((1, 2), 1), ...]
  // Calculate a sum per interval
  .reduceByKey(_ + _)   // [((1, 2), 2), ...]

// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
  }

}

Upvotes: 0

Related Questions