Reputation: 17676
How should empty partitions be handled in mapPartitionsWithIndex
?
A full example can be found: https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
My goal is to fill nan values with the last good known value via RDD as an improvement of Spark / Scala: fill nan with last good observation.
But some partitions do not contain any value:
###################### carry
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
###################### carry
case class FooBar(foo: Option[Date], bar: String)
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined
myDf.rdd.filter(x => notMissing(Some(x))).count
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap
When using
val toCarryBd = spark.sparkContext.broadcast(toCarry)
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
if (iter.isEmpty) {
iter
} else {
var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
iter.map(foo => {
println("original ", foo)
if (!notMissing(Some(foo))) {
println("replaced")
// this will go into the default case
// FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar)
FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error
} else {
lastNotNullRow = Some(foo)
foo
}
})
}
}
val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
to fill in the values it will crash.
output if applying the input from the answer. Still not 100% there
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-04| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
Upvotes: 1
Views: 2898
Reputation: 7452
As far as handling empty partitions when working mapPartitions (and similar), the general approach is to return an empty iterator of the correct type when you have an empty input iterator.
It looks like your code is doing this, however it seems like you likely have a bug in your application logic (namely it assumes that if a partition has a record missing a value it will either have had a previous row in the same partition which is good OR that the previous partition is not empty and has a good row - which need not necessarily be the case). You've partially fixed this problem by going through and for each partition collecting the last previous good value, and then if you don't have a good value at the start of a partition look up the value in the collected array.
However, if this also happens at the same time the previous partition is empty, you will need to go and lookup the previous previous partition value until you find the one you are looking for. (Note this assumes that the first record in your dataset is valid, if it isn't your code will still fail).
Your solution is really close to working but just has some minor assumptions which don't always necessarily hold.
Upvotes: 2