Pulah
Pulah

Reputation: 123

Imputing null values in spark dataframe, based on the row category, by fetching the values from another dataframe in Scala

So i have a dataframe as shown below, that has been stored as a temporary view by the name mean_value_gn5 so that i can query using sql(), whenever i need to fetch the data.

+-------+----+
|Species|Avgs|
+-------+----+
|    NO2|  43|
|    NOX|  90|
|     NO|  31|
+-------+----+

This dataframe stores the categorical average of the 'Species' rounded off to the nearest whole number using the ceil() function. I need to use these categorical averages to impute the missing values of column Value in my dataframe of interest clean_gn5. I created a new column Value_imp which would hold my final column with the imputed values.

I made an attempt to do so as follows:

clean_gn5 = clean_gn5.withColumn("Value_imp",
      when($"Value".isNull, sql("Select Avgs from mean_value_gn5 where Species = "+$"Species").select("Avgs").head().getLong(0).toInt)
        .otherwise($"Value"))

The above mentioned code runs, but the values are getting incorrectly imputed i.e. for the row containing Species as NO the value getting imputed is 43 instead of 31.

Prior to doing this I first checked if i was able to fetch the values correctly by executing the following:

println(sql("Select Avgs from mean_value_gn5 where Species = 'NO'").select("Avgs").head().getLong(0))

I am able to fetch the value correctly after hardcoding the Species and as per my understanding $"Species" should help me fetch the value corresponding to the Species column for that particular row.

Also I thought that probably i was missing the single quotes around the hardcoded Species value i.e. 'NO'. So I tried doing the following

 clean_gn5 = clean_gn5.withColumn("Value_imp",
      when($"Value".isNull, sql("Select Avgs from mean_value_gn5 where Species = '"+$"Species"+"'").select("Avgs").head().getLong(0).toInt)
        .otherwise($"Value"))

But that resulted in the following exception.

Exception in thread "main" java.util.NoSuchElementException: next on empty iterator

I am fairly new to Spark and Scala.

Upvotes: 0

Views: 318

Answers (1)

werner
werner

Reputation: 14845

Let's assume clean_gn5 contains the data

+-------+-----+
|Species|Value|
+-------+-----+
|    NO2|  2.3|
|    NOX|  1.1|
|     NO| null|
|    ABC|  4.0|
|    DEF| null|
|    NOX| null|
+-------+-----+

Joining clean_gn5 with mean_value_gn5 using a left join will result in

+-------+-----+----+
|Species|Value|Avgs|
+-------+-----+----+
|    NO2|  2.3|  43|
|    NOX|  1.1|  90|
|     NO| null|  31|
|    ABC|  4.0|null|
|    DEF| null|null|
|    NOX| null|  90|
+-------+-----+----+

On this dataframe you can apply per row the logic you have already given in your question and the result is (after dropping the Avgs column):

+-------+-----+---------+
|Species|Value|Value_imp|
+-------+-----+---------+
|    NO2|  2.3|      2.3|
|    NOX|  1.1|      1.1|
|     NO| null|     31.0|
|    ABC|  4.0|      4.0|
|    DEF| null|     null|
|    NOX| null|     90.0|
+-------+-----+---------+

The code:

clean_gn5.join(mean_value_gn5, Seq("Species"), "left")
  .withColumn("Value_imp", when('value.isNull, 'Avgs).otherwise('value))
  .drop("Avgs")
  .show()

Upvotes: 2

Related Questions