Reputation: 5
I have a data which contains
+------------+----------+
|BaseFromYear|BaseToYear|
+------------+----------+
| 2013| 2013|
+------------+----------+
I need to check difference of the two years and compare in another dataframe wheather the required year exists in base years so created a query
val df = DF_WE.filter($"id"===3 && $"status"===1).select("BaseFromYear","BaseToYear").withColumn("diff_YY",$"BaseToYear"-$"BaseFromYear".cast(IntegerType)).withColumn("Baseyears",when($"diff_YY"===0,$BaseToYear))
+------------+----------+-------+---------+
|BaseFromYear|BaseToYear|diff_YY|Baseyears|
+------------+----------+-------+---------+
| 2013| 2013| 0| 2013|
+------------+----------+-------+---------+
So I get above output But if basefromyear 2014 and basetoyear is 2017 then differnce will be 3 I need to get [2014,2015,2016,2017] as Baseyears .. So that in the next step I have a required year say 2016 so need compare with base year. I see isin function will it work??
Upvotes: 0
Views: 178
Reputation: 1186
I have added comment in the code, let me know if you need further explanation.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
// This is a user defined function(udf) which will populate an array of Int from BaseFromYear to BaseToYear
val generateRange: (Int, Int) => Array[Int] = (baseFromYear: Int, baseToYear: Int) => (baseFromYear to baseToYear).toArray
val sqlfunc = udf(generateRange) // Registering the UDF with spark
val df = DF_WE.filter($"id" === 3 && $"status" === 1)
.select("BaseFromYear", "BaseToYear")
.withColumn("diff_YY", $"BaseToYear" - $"BaseFromYear".cast(IntegerType))
.withColumn("Baseyears", sqlfunc($"BaseFromYear", $"BaseToYear")) // using the UDF to populate new columns
df.show()
// Now lets say we are selecting records which has 2016 in the Baseyears
val filteredDf = df.where(array_contains(df("Baseyears"), 2016))
filteredDf.show()
// Seq[Row] is not type safe, please be careful about that
val isIn: (Int, Seq[Row] ) => Boolean = (num: Int, years: Seq[Row] ) => years.contains(num)
val sqlIsIn = udf(isIn)
val filteredDfBasedOnAnotherCol = df.filter(sqlIsIn(df("YY"), df("Baseyears")))
Upvotes: 1