Georg Heiler
Georg Heiler

Reputation: 17676

spark map partitions to fill nan values

I want to fill nan values in spark using the last good known observation - see: Spark / Scala: fill nan with last good observation

My current solution used window functions in order to accomplish the task. But this is not great, as all values are mapped into a single partition. val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } should work a lot better. But strangely my fill function is not executed. What is wrong with my code?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

Here is the full example code:

import java.sql.Date

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

case class FooBar(foo: Date, bar: String)

object WindowFunctionExample extends App {

  Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
    .setAppName("foo")
    .setMaster("local[*]")

  val spark: SparkSession = SparkSession
    .builder()
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()

  import spark.implicits._

  val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
    ("2016-wrongFormat", "noValidFormat"),
    ("2016-01-04", "lastAssumingSameDate"))
  val recordsDF = myDff
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
  recordsDF.show

  def notMissing(row: FooBar): Boolean = {
    row.foo != null
  }

  val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
  println("###################### carry ")
  println(toCarry)
  println(toCarry.foreach(println))
  println("###################### carry ")
  val toCarryBd = spark.sparkContext.broadcast(toCarry)

  def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
    var lastNotNullRow: FooBar = toCarryBd.value(i).get
    iter.map(row => {
      if (!notMissing(row))1
        FooBar(lastNotNullRow.foo, row.bar)
      else {
        lastNotNullRow = row
        row
      }
    })
  }

  // The algorithm does not step into the for loop for filling the null values. Strange
  val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
  val imputedDF = imputed.toDS()

  println(imputedDF.orderBy($"foo").collect.toList)
  imputedDF.show
  spark.stop
}

edit

I fixed the code as outlined by the comment. But the toCarryBd contains None values. How can this happen as I did filter explicitly for

def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption

non None values.

(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)

This leads to NoSuchElementException: None.getwhen trying to access toCarryBd.

Upvotes: 0

Views: 727

Answers (1)

Daniel de Paula
Daniel de Paula

Reputation: 17872

Firstly, if your foo field can be null, I would recommend creating the case class as:

case class FooBar(foo: Option[Date], bar: String)

Then, you can rewrite your notMissing function to something like:

def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined

Upvotes: 2

Related Questions