Shankar
Shankar

Reputation: 8967

Spark SQL DataFrame - Exception handling

In our application , most of our code is just apply filter , group by and aggregate operations on DataFrame and save the DF to Cassandra database.

Like the below code, we have several methods which do the same kind of operations[filter, group by, join, agg] on different number of fields and returns an DF and that will be saved to Cassandra tables.

Sample code is:

 val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")

saveToCassandra(df)

def saveToCassandra(df: DataFrame) {
    try {
        df.write.format("org.apache.spark.sql.cassandra")
        .options(Map("Table" -> "tableName", "keyspace" -> keyspace)
        .mode("append").save()
    }
    catch {
        case e: Throwable => log.error(e)
    }
}

Since i am calling the action by saving the DF to Cassandra, i hope i need to handle the exception only on that line as per this thread.

If i get any exception, i can see the exception in the Spark detailed log by default.

Do i have to really surround the filter, group by code with Try or try , catch?

I don't see any example on Spark SQL DataFrame API examples with exception handling.

How do i use the Try on saveToCassandra method? it returns Unit

Upvotes: 0

Views: 9007

Answers (2)

RockSolid
RockSolid

Reputation: 536

There is no point wrapping the lazy DAG in try catch.
You would need to wrap the lambda function in Try().
Unfortunately there AFAIK there is no way to do row level exception handling in DataFrames.

You can use RDD or DataSet as mentioned in answer to this post below spache spark exception handling

Upvotes: 1

himanshuIIITian
himanshuIIITian

Reputation: 6095

You don't really need to surround the filter, group by code with Try or try , catch. Since, all of these operations are transformations, they don't get execute until an action is performed on them, like saveToCassandra in your case.

However, if an error occurs while filtering, grouping or aggregating the dataframe, the catch clause in saveToCassandra function will log it as action is being performed there.

Upvotes: 0

Related Questions