Reputation: 473
I have a RDD
, the rdd
's value is 0 or 1, and a limit is 4. When I map the RDD
, if rdd
's value is 1 then the values from the current position to the (current position+limit) are all 1 else there are 0 0 .
example.
input : 1,0,0,0,0,0,1,0,0
expected output : 1,1,1,1,0,0,1,1,1
This is what I have tried so far :
val rdd = sc.parallelize(Array(1, 0, 0, 0, 0, 0, 1, 0, 0))
val limit = 4
val resultlimit = rdd.mapPartitions(parIter => {
var result = new ArrayBuffer[Int]()
var resultIter = new ArrayBuffer[Int]()
while (parIter.hasNext) {
val iter = parIter.next()
resultIter.append(iter)
}
var i = 0
while (i < resultIter.length) {
result.append(resultIter(i))
if (resultIter(i) == 1) {
var j = 1
while (j + i < resultIter.length && j < limit) {
result.append(1)
j += 1
}
i += j
} else {
i += 1
}
}
result.toIterator
})
resultlimit.foreach(println)
The result of resultlimit is RDD:[1,1,1,1,0,0,1,1,1]
My quick and dirty approach is to first create an Array but that is so ugly and inefficient.
Is there any cleaner solution?
Upvotes: 0
Views: 111
Reputation: 41987
Following is an improved approach to your requirement. Three while
loops are reduced to one for
loop and two ArrayBuffer
s are reduced to one ArrayBuffer
. So processing time and memory usage both are reduced.
val resultlimit= rdd.mapPartitions(parIter => {
var result = new ArrayBuffer[Int]()
var limit = 0
for (value <- parIter) {
if (value == 1) limit = 4
if (limit > 0) {
result.append(1)
limit -= 1
}
else {
result.append(value)
}
}
result.toIterator
})
Edited
Above solution is when you don't have a partition
defined in the original rdd
. But when a partition is defined as
val rdd = sc.parallelize(Array(1,1,0,0,0,0,1,0,0), 4)
We need to collect
the rdds
as above solution will get executed on each partitions
.
So the following solution should work
var result = new ArrayBuffer[Int]()
var limit = 0
for (value <- rdd.collect()) {
if (value == 1) limit = 4
if (limit > 0) {
result.append(1)
limit -= 1
}
else {
result.append(value)
}
}
result.foreach(println)
Upvotes: 0
Reputation: 330453
Plain and simple. Import RDDFunctions
import org.apache.spark.mllib.rdd.RDDFunctions._
Define a limit:
val limit: Int = 4
Perpend limit
- 1 zeros to the first partition:
val extended = rdd.mapPartitionsWithIndex {
case (0, iter) => Seq.fill(limit - 1)(0).toIterator ++ iter
case (_, iter) => iter
}
Slide over the RDD
:
val result = extended.sliding(limit).map {
slice => if (slice.exists(_ != 0)) 1 else 0
}
Check the result:
val expected = Seq(1,1,1,1,0,0,1,1,1)
require(expected == result.collect.toSeq)
On a side note, your current approach doesn't correct for partition boundaries, therefore result will vary depending on the source.
Upvotes: 1