Reputation: 67
I'm building a tail-recursive function that reads multiple hdfs paths and merges all of them into a single data-frame. The function works perfectly as long as all the path exist, if not, the function fails and does not finish joining the data of the paths that do exist. To solve this problem I have tried to handle the error using try/catch but have not been successful.
The error says: could not optimize @tailrec annotated method loop: it contains a recursive call not in tail position
My function is :
def getRangeData(toOdate: String, numMonths: Int, pathRoot: String, ColumnsTable: List[String]): DataFrame = {
val dataFrameNull = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
StructType((ColumnsTable :+ "odate").map(columnName => StructField(columnName, StringType, true))))
val rangePeriod = getRangeDate(numMonths, toOdate)
@tailrec
def unionRangeData(rangePeriod: List[LocalDate], pathRoot: String, df: DataFrame = dataFrameNull): DataFrame = {
try {
if (rangePeriod.isEmpty) {
df
}
else {
val month = "%02d".format(rangePeriod.head.getMonthValue)
val year = rangePeriod.head.getYear
val odate = rangePeriod.head.toString
val path = s"${pathRoot}/partition_data_year_id=${year}/partition_data_month_id=${month}"
val columns = ColumnsTable.map(columnName => trim(col(columnName)).as(columnName))
val dfTemporal = spark.read.parquet(path).select(columns: _*).withColumn("odate", lit(odate).cast("date"))
unionRangeData(rangePeriod.tail, pathRoot, df.union(dfTemporal))
}
} catch {
case e: Exception =>
logger.error("path not exist")
dataFrameNull
}
}
unionRangeData(rangePeriod, pathRoot)
}
def getRangeDate(numMonths: Int, toOdate: String, listDate: List[LocalDate] = List()): List[LocalDate] = {
if (numMonths == 0) {
listDate
}
else {
getRangeDate(numMonths - 1, toOdate, LocalDate.parse(toOdate).plusMonths(1).minusMonths(numMonths) :: listDate)
}
}
In advance, thank you very much for your help.
Upvotes: 1
Views: 275
Reputation: 21
I would suggest you remove the try-catch
construct entirely from the function and use it instead at the call site at the bottom of getRangeData
.
Alternatively you can also use scala.util.Try
to wrap the call: Try(unionRangeData(rangePeriod, pathRoot))
, and use one of its combinators to perform your logging or provide a default value in the error case.
Related post which explains why the Scala compiler cannot perform tail call optimization inside try-catch: Why won't Scala optimize tail call with try/catch?
Upvotes: 2