Reputation: 1129
I am trying to substring(column,numOne,numTwo) for a given original DataFrame and create a new DataFrame by doing UNION on all subsets of DataFrame which were being created by doing substring(column,numOne,numTwo). Below is some piece of code I've come up with
def main(args: Array[String]): Unit = {
//To Log only ERRORS
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("PopularMoviesDS")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.master("local[*]")
.getOrCreate()
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreatorOrg(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
while(swing>0){
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
swing -= 1
}
returnDf
}
dataFrameCreator(dataframeInt).show()
+-----+
| name|
+-----+
|handl|
| Chan|
+-----+
The above code is working as I expected, but I want to run the above-using tail recursion. Code below,
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreator(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
returnDf
}
@tailrec
def bigUnionHelper(num: Int, df: DataFrame): DataFrame = {
if (num<0) df
else bigUnionHelper(num-1, dataFrameCreator(dataframeInt))
}
bigUnionHelper(swing, dataframeInt).show()
//Result:
+-----+
| name|
+-----+
|handl|
|handl|
|handl|
+-----+
I totally get that there is room for optimization but I am unable to figure out why the tailRecursive - bigUnionHelper is not working and not giving the same result as the first function. Any help is appreciated, Thank you so much in Advance.
Upvotes: 1
Views: 1143
Reputation: 1461
I think it should be this way.
val swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
def bigUnionHelper(df: DataFrame, num: Int): DataFrame = {
@tailrec
def dataFrameCreator(df: DataFrame, num:Int, acc:List[DataFrame] = List()): List[DataFrame] = {
if (num < 1) acc
else {
val map: Map[Int, Seq[String]] = Map(1 -> Seq("1", "4"), 2 -> Seq("2", "5"))
val tempDf = df.selectExpr(s"substring(name,${map(num).head},${map(swing)(1)})")
dataFrameCreator(df, num -1, tempDf :: acc)
}
}
dataFrameCreator(df, num).reduce(_ union _)
}
bigUnionHelper(dataframeInt, swing).show()
Upvotes: 2