sash
sash

Reputation: 3

finding least subset in an rdd in spark

can anyone help me I have an RDD of BitSets like

Array[scala.collection.mutable.BitSet] = Array(BitSet(1, 2), BitSet(1, 7), BitSet(8, 9, 10, 11), BitSet(1, 2, 3, 4),BitSet(8,9,10),BitSet(1,2,3))

I want rdd of (BitSet(1,2),BitSet(1,7),BitSet(8,9,10)) that is i require least subset or the BitSet which is not subset to any BitSet

Explanation: here (1,2),(1,2,3)(1,2,3,4) -----here least subset is (1,2) and (1,7) is not subset to other BitSets ---so (1,7) also present in the result

Upvotes: 0

Views: 213

Answers (1)

Nazarii Bardiuk
Nazarii Bardiuk

Reputation: 4342

This problem can be solved using aggregation

val rdd = sc.parallelize(
  Seq(BitSet(1, 2),
      BitSet(1, 7),
      BitSet(8, 9, 10, 11),
      BitSet(1, 2, 3, 4),
      BitSet(8, 9, 10),
      BitSet(1, 2, 3)))
val result = rdd.aggregate(Seq[BitSet]())(accumulate, merge)
println(result)

output:

List(BitSet(8, 9, 10), BitSet(1, 7), BitSet(1, 2))

accumulate is a function that collects data on each spark partition

def accumulate(acc: Seq[BitSet], elem: BitSet): Seq[BitSet] = {
  val subsets = acc.filterNot(elem.subsetOf)
  if (notSuperSet(subsets)(elem)) elem +: subsets else subsets
}

merge combines results of partitions

def merge(left: Seq[BitSet], right: Seq[BitSet]): Seq[BitSet] = {
  val leftSubsets = left.filter(notSuperSet(right))
  val rightSubsets = right.filter(notSuperSet(leftSubsets))
  leftSubsets ++ rightSubsets
}

and a helper function notSuperSet that is a predicate to check whether a new set is superset of other sets

def notSuperSet(subsets: Seq[BitSet])(set: BitSet): Boolean =
  subsets.forall(!_.subsetOf(set))

Upvotes: 4

Related Questions