Reputation: 123
So i have a dataframe as shown below, that has been stored as a temporary view by the name mean_value_gn5
so that i can query using sql()
, whenever i need to fetch the data.
+-------+----+
|Species|Avgs|
+-------+----+
| NO2| 43|
| NOX| 90|
| NO| 31|
+-------+----+
This dataframe stores the categorical average of the 'Species' rounded off to the nearest whole number using the ceil()
function. I need to use these categorical averages to impute the missing values of column Value
in my dataframe of interest clean_gn5
. I created a new column Value_imp
which would hold my final column with the imputed values.
I made an attempt to do so as follows:
clean_gn5 = clean_gn5.withColumn("Value_imp",
when($"Value".isNull, sql("Select Avgs from mean_value_gn5 where Species = "+$"Species").select("Avgs").head().getLong(0).toInt)
.otherwise($"Value"))
The above mentioned code runs, but the values are getting incorrectly imputed i.e. for the row containing Species
as NO
the value getting imputed is 43
instead of 31
.
Prior to doing this I first checked if i was able to fetch the values correctly by executing the following:
println(sql("Select Avgs from mean_value_gn5 where Species = 'NO'").select("Avgs").head().getLong(0))
I am able to fetch the value correctly after hardcoding the Species
and as per my understanding $"Species"
should help me fetch the value corresponding to the Species
column for that particular row.
Also I thought that probably i was missing the single quotes around the hardcoded Species
value i.e. 'NO'
. So I tried doing the following
clean_gn5 = clean_gn5.withColumn("Value_imp",
when($"Value".isNull, sql("Select Avgs from mean_value_gn5 where Species = '"+$"Species"+"'").select("Avgs").head().getLong(0).toInt)
.otherwise($"Value"))
But that resulted in the following exception.
Exception in thread "main" java.util.NoSuchElementException: next on empty iterator
I am fairly new to Spark and Scala.
Upvotes: 0
Views: 318
Reputation: 14845
Let's assume clean_gn5
contains the data
+-------+-----+
|Species|Value|
+-------+-----+
| NO2| 2.3|
| NOX| 1.1|
| NO| null|
| ABC| 4.0|
| DEF| null|
| NOX| null|
+-------+-----+
Joining clean_gn5
with mean_value_gn5
using a left join will result in
+-------+-----+----+
|Species|Value|Avgs|
+-------+-----+----+
| NO2| 2.3| 43|
| NOX| 1.1| 90|
| NO| null| 31|
| ABC| 4.0|null|
| DEF| null|null|
| NOX| null| 90|
+-------+-----+----+
On this dataframe you can apply per row the logic you have already given in your question and the result is (after dropping the Avgs column):
+-------+-----+---------+
|Species|Value|Value_imp|
+-------+-----+---------+
| NO2| 2.3| 2.3|
| NOX| 1.1| 1.1|
| NO| null| 31.0|
| ABC| 4.0| 4.0|
| DEF| null| null|
| NOX| null| 90.0|
+-------+-----+---------+
The code:
clean_gn5.join(mean_value_gn5, Seq("Species"), "left")
.withColumn("Value_imp", when('value.isNull, 'Avgs).otherwise('value))
.drop("Avgs")
.show()
Upvotes: 2