Reputation: 17676
I am using the spark 2.0.1 and want to fill nan values with the last good known value in the column.
The only reference for spark I could find Spark / Scala: forward fill with last observation or Fill in null with previously known good value with pyspark which seem to use RDD.
I would rather like to stay in the data frame / dataset world and possible handle multiple nan values. Is this possible?
My assumption is that the data (initially loaded from e.g. a CSV file is ordered by time and this order is preserved in the distributed setting e.g. filling by close / last good known value is correct. Maybe filling with the previous value is enough as for most records there are no 2 or more nan records in a row. Does this actually hold? The point is that a
myDf.sort("foo").show
Would destroy any order e.g. all null
values will come first.
A small example:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
Results in
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
I would like to fix the value with the last good known value. How can I achieve this?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
in my case, it would be good enough to fill the value from the row above, as there are only very limited faulty values.
I try to add an index column
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
And then fill with the last value.
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
But that reads the following warning: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. How could I introduce meaningful partitions?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
Upvotes: 4
Views: 3399
Reputation: 63
//filling null fields with last non known null I tried and this actually worked !!
val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")
val df = spark.sql("select * from dbname.tablename")
val Df1 = df.withColumn("rowId", monotonically_increasing_id())
import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.orderBy("rowId")
val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))
Df2.show
Upvotes: 2
Reputation: 17676
This is an intermediate answer. However, it is not great as no partitions / only a single partition is used. I am still looking for a better way to solve the problem
df
.withColumn("rowId", monotonically_increasing_id())
.withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
.withColumn("columnWithNullReplaced",
when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")
)
I am working on building a better solution using mapPartitionsWithIndex
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 is not complete yet.
adding
if (i == 0) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
} else {
lastNotNullRow = toCarryBd.value.get(i - 1).get
}
will lead to the desired result.
Upvotes: 1