palla
palla

Reputation: 5

Spark Get years as array To compare

I have a data which contains

+------------+----------+
|BaseFromYear|BaseToYear|
+------------+----------+
|        2013|      2013|
+------------+----------+

I need to check difference of the two years and compare in another dataframe wheather the required year exists in base years so created a query

val df = DF_WE.filter($"id"===3 && $"status"===1).select("BaseFromYear","BaseToYear").withColumn("diff_YY",$"BaseToYear"-$"BaseFromYear".cast(IntegerType)).withColumn("Baseyears",when($"diff_YY"===0,$BaseToYear))
 +------------+----------+-------+---------+
 |BaseFromYear|BaseToYear|diff_YY|Baseyears|
 +------------+----------+-------+---------+
 |        2013|      2013|      0|     2013|
 +------------+----------+-------+---------+

So I get above output But if basefromyear 2014 and basetoyear is 2017 then differnce will be 3 I need to get [2014,2015,2016,2017] as Baseyears .. So that in the next step I have a required year say 2016 so need compare with base year. I see isin function will it work??

Upvotes: 0

Views: 178

Answers (1)

Tawkir
Tawkir

Reputation: 1186

I have added comment in the code, let me know if you need further explanation.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

// This is a user defined function(udf) which will populate an array of Int from BaseFromYear to BaseToYear
val generateRange: (Int, Int) => Array[Int] = (baseFromYear: Int, baseToYear: Int) => (baseFromYear to baseToYear).toArray
val sqlfunc = udf(generateRange) // Registering the UDF with spark

val df = DF_WE.filter($"id" === 3 && $"status" === 1)
  .select("BaseFromYear", "BaseToYear")
  .withColumn("diff_YY", $"BaseToYear" - $"BaseFromYear".cast(IntegerType))
  .withColumn("Baseyears", sqlfunc($"BaseFromYear", $"BaseToYear")) // using the UDF to populate new columns

df.show()
// Now lets say we are selecting records which has 2016 in the Baseyears
val filteredDf = df.where(array_contains(df("Baseyears"), 2016))
filteredDf.show()

// Seq[Row] is not type safe, please be careful about that
val isIn: (Int, Seq[Row] ) => Boolean = (num: Int, years: Seq[Row] ) => years.contains(num)
val sqlIsIn = udf(isIn)

val filteredDfBasedOnAnotherCol = df.filter(sqlIsIn(df("YY"), df("Baseyears")))

Upvotes: 1

Related Questions