mentongwu
mentongwu

Reputation: 473

How to get multiple adjacent data in a RDD with Scala Spark

I have a RDD, the rdd's value is 0 or 1, and a limit is 4. When I map the RDD, if rdd's value is 1 then the values from the current position to the (current position+limit) are all 1 else there are 0 0 . example.

input :           1,0,0,0,0,0,1,0,0
expected output : 1,1,1,1,0,0,1,1,1

This is what I have tried so far :

val rdd = sc.parallelize(Array(1, 0, 0, 0, 0, 0, 1, 0, 0))
val limit = 4
val resultlimit = rdd.mapPartitions(parIter => {
  var result = new ArrayBuffer[Int]()
  var resultIter = new ArrayBuffer[Int]()
  while (parIter.hasNext) {
    val iter = parIter.next()
    resultIter.append(iter)
  }
  var i = 0
  while (i < resultIter.length) {
    result.append(resultIter(i))
    if (resultIter(i) == 1) {
      var j = 1
      while (j + i < resultIter.length && j < limit) {
        result.append(1)
        j += 1
      }
      i += j
    } else {
      i += 1
    }
  }
  result.toIterator
})
resultlimit.foreach(println)

The result of resultlimit is RDD:[1,1,1,1,0,0,1,1,1]

My quick and dirty approach is to first create an Array but that is so ugly and inefficient.

Is there any cleaner solution?

Upvotes: 0

Views: 111

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

Following is an improved approach to your requirement. Three while loops are reduced to one for loop and two ArrayBuffers are reduced to one ArrayBuffer. So processing time and memory usage both are reduced.

val resultlimit= rdd.mapPartitions(parIter => {
  var result = new ArrayBuffer[Int]()
  var limit = 0
  for (value <- parIter) {
    if (value == 1) limit = 4
    if (limit > 0) {
      result.append(1)
      limit -= 1
    }
    else {
      result.append(value)
    }
  }
  result.toIterator
})

Edited

Above solution is when you don't have a partition defined in the original rdd. But when a partition is defined as

val rdd = sc.parallelize(Array(1,1,0,0,0,0,1,0,0), 4)

We need to collect the rdds as above solution will get executed on each partitions.

So the following solution should work

 var result = new ArrayBuffer[Int]()
  var limit = 0
  for (value <- rdd.collect()) {
    if (value == 1) limit = 4
    if (limit > 0) {
      result.append(1)
      limit -= 1
    }
    else {
      result.append(value)
    }
  }
result.foreach(println)

Upvotes: 0

zero323
zero323

Reputation: 330453

Plain and simple. Import RDDFunctions

import org.apache.spark.mllib.rdd.RDDFunctions._

Define a limit:

val limit: Int = 4

Perpend limit - 1 zeros to the first partition:

val extended = rdd.mapPartitionsWithIndex {
  case (0, iter) => Seq.fill(limit - 1)(0).toIterator ++ iter
  case (_, iter) => iter
}

Slide over the RDD:

val result = extended.sliding(limit).map {
  slice => if (slice.exists(_ != 0)) 1 else 0
}

Check the result:

val expected = Seq(1,1,1,1,0,0,1,1,1)
require(expected == result.collect.toSeq)

On a side note, your current approach doesn't correct for partition boundaries, therefore result will vary depending on the source.

Upvotes: 1

Related Questions