add-semi-colons
add-semi-colons

Reputation: 18830

Iterating over cogrouped RDD

I have used a cogroup function and obtain following RDD:

org.apache.spark.rdd.RDD[(Int, (Iterable[(Int, Long)], Iterable[(Int, Long)]))]

Before the map operation joined object would look like this:

RDD[(Int, (Iterable[(Int, Long)], Iterable[(Int, Long)]))]

(-2095842000,(CompactBuffer((1504999740,1430096464017), (613904354,1430211912709), (-1514234644,1430288363100), (-276850688,1430330412225)),CompactBuffer((-511732877,1428682217564), (1133633791,1428831320960), (1168566678,1428964645450), (-407341933,1429009306167), (-1996133514,1429016485487), (872888282,1429031501681), (-826902224,1429034491003), (818711584,1429111125268), (-1068875079,1429117498135), (301875333,1429121399450), (-1730846275,1429131773065), (1806256621,1429135583312))))
(352234000,(CompactBuffer((1350763226,1430006650167), (-330160951,1430320010314)),CompactBuffer((2113207721,1428994842593), (-483470471,1429324209560), (1803928603,1429426861915))))

Now I want to do the following:

val globalBuffer = ListBuffer[Double]()
val joined = data1.cogroup(data2).map(x => {
  val listA = x._2._1.toList
  val listB = x._2._2.toList
  for(tupleB <- listB) {
    val localResults = ListBuffer[Double]()
    val itemToTest = Set(tupleB._1)
    val tempList = ListBuffer[(Int, Double)]()
    for(tupleA <- listA) {
      val tValue = someFunctionReturnDouble(tupleB._2, tupleA._2)
      val i = (tupleA._1, tValue)
      tempList += i
    }
    val sortList = tempList.sortWith(_._2 > _._2).slice(0,20).map(i => i._1)
    val intersect = sortList.toSet.intersect(itemToTest)
    if (intersect.size > 0)
      localResults += 1.0
    else localResults += 0.0
    val normalized = sum(localResults.toList)/localResults.size
    globalBuffer += normalized
  }
})

//method sum
def sum(xs: List[Double]): Double = {//do the sum}

At the end of this I was expecting joined to be a list with double values. But when I looked at it it was unit. Also I will this is not the Scala way of doing it. How do I obtain globalBuffer as the final result.

Upvotes: 1

Views: 445

Answers (2)

Jason Scott Lenderman
Jason Scott Lenderman

Reputation: 1918

Transformations of RDDs are not going to modify globalBuffer. Copies of globalBuffer are made and sent out to each of the workers, but any modifications to these copies on the workers will never modify the globalBuffer that exists on the driver (the one you have defined outside the map on the RDD.) Here's what I do (with a few additional modifications):

val joined = data1.cogroup(data2) map { x =>
  val iterA = x._2._1
  val iterB = x._2._2
  var count, positiveCount = 0
  val tempList = ListBuffer[(Int, Double)]()
  for (tupleB <- iterB) {
    tempList.clear
    for(tupleA <- iterA) {
      val tValue = someFunctionReturnDouble(tupleB._2, tupleA._2)
      tempList += ((tupleA._1, tValue))
    }
    val sortList = tempList.sortWith(_._2 > _._2).iterator.take(20)
    if (sortList.exists(_._1 == tupleB._1)) positiveCount += 1
    count += 1
  }
  positiveCount.toDouble/count
}

At this point you can obtain of local copy of the proportions by using joined.collect.

Upvotes: 1

Ashalynd
Ashalynd

Reputation: 12573

Hmm, if I understood your code correctly, it could benefit from these improvements:

val joined = data1.cogroup(data2).map(x => {
  val listA = x._2._1.toList
  val listB = x._2._2.toList
  val localResults = listB.map { 
    case (intBValue, longBValue) =>
    val itemToTest = intBValue // it's always one element
    val tempList = listA.map {
       case (intAValue, longAValue) =>
       (intAValue, someFunctionReturnDouble(longBvalue, longAValue))
    }
    val sortList = tempList.sortWith(-_._2).slice(0,20).map(i => i._1)
    if (sortList.toSet.contains(itemToTest)) { 1.0 } else {0.0}
// no real need to convert to a set for 20 elements, by the way
  }
  sum(localResults)/localResults.size
})

Upvotes: 1

Related Questions