Reputation: 1868
There're billions of intervals in format [a, b]
, and all of them will cut the number space into multiple single pieces. I intend to output all single pieces with the number of overlapped intervals within this piece.
For instance: there are 3 intervals, namely: [1,7], [2,3], [6, 8]. It should output result as below:
[-∞, 1]: 0
[1, 2]: 1
[2, 3]: 2
[3, 6]: 1
[6, 7]: 2
[7, 8]: 1
[8, +∞]: 0
If for a single machine (not a distributed solution as in MapReduce), I know the solution could be break down the interval instance into start_n
, end_n
, sort by the number and iterate from left to right and use a counter to count the amount in current piece and output. But I'm not sure how this algorithm could be splitted into a distributed way.
Any suggestions? Thanks.
Upvotes: 0
Views: 201
Reputation: 5547
In mapreduce, the simplest approach would be to write each of the number in the pair to the reducer. The sort shuffle phase takes care of the sorting the number and reducer would take care of repairing.
e.g. for the input pair [1,7]
the Mapper output would be:
key: NullWritable Value: 1
key: NullWritable Value: 7
key: NullWritable Value: 1_7
With the same pattern, the output form all mapper would be:
key: NullWritable Value: 1
key: NullWritable Value: 7
key: NullWritable Value: 1_7
key: NullWritable Value: 2
key: NullWritable Value: 3
key: NullWritable Value: 2_3
key: NullWritable Value: 6
key: NullWritable Value: 8
key: NullWritable Value: 6_8
The sort-shuffle step would aggregate the output as
Key: NullWritable ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]
The Reducer iterate through the list of values (which would be an ordered list) and
Segregate the pair values into a separate list [1_7, 2_3, 6_8]
. You may just check for occurrence of _
in the text to figure out the pair.
Re-pair the space values as below.
[-infinity, 1]
[1, 2]
[2, 3]
[3, 6]
[6, 7]
[7, 8]
[8, +infinity]
parse
functions.e.g. -infinity (say a very big negative long -9999999) is out of all the pair range, hence the reducer output will be
key:
"[-infinity, 1]" (Text
Type)value: 0 (
IntWritable` type)
Similarly for pair [1,2]
, 1>=1 and 2<=7
so reducer output
key:
"[1, 2]" (Text
Type)value: 1 (
IntWritable` type)
For pair [6,7]
, 6>=1 and 7<=7
and 6>=6 and 7<=8
so reducer output
key:
"[1, 2]" (Text
Type)value: 2 (
IntWritable` type)
and so on...
Note: NullWritable
is a Java hadoop API
, which represents just null
. Instead of NullWritable
, you may use any constant data (say a Hadoop Text
type Writable
). The main point here is to make sure that all the mapper output should land to single reducer due to same mapper key.
Upvotes: 1
Reputation: 2934
Below is a working Spark code (at least with your example it gives correct result:
Code is not very efficient due to 2 cartesian products.
Also the condition for interval comparison might require some attention :)
Feel free to improve the code and post your improved answer here.
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
val sc = new SparkContext(conf)
case class Interval(start : Double, end : Double)
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)
// Get unique flat boundary values e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)
// Calculate all intervals
val intervals = all_boundaries
// For each interval start get all possible interval ends
.cartesian(all_boundaries) // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
// Filter out invalid intervals (where begin is either less or equal to the end) e.g.: from previous comment (2, 1) is invalid interval
.filter(v => v._1 < v._2) // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
// Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
.reduceByKey((a, b) => Math.min(a, b)) // [(1, 2) (2, 3), ...]
// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)
// Get counts of overlapping intervals
val countsPerInterval = intervals
// for each small interval get all possible intput intervals e.g.:
.cartesian(inputRdd) // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
// Filter out intervals that do not overlap
.filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2} // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
// Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
.mapValues(_ => 1) //[((1, 2), 1), ((1, 2), 1), ...]
// Calculate a sum per interval
.reduceByKey(_ + _) // [((1, 2), 2), ...]
// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
}
}
Upvotes: 0