Reputation: 8967
In our application , most of our code is just apply filter
, group by
and aggregate
operations on DataFrame
and save the DF to Cassandra database.
Like the below code, we have several methods which do the same kind of operations[filter, group by, join, agg
] on different number of fields and returns an DF and that will be saved to Cassandra tables.
Sample code is:
val filteredDF = df.filter(col("hour") <= LocalDataTime.now().getHour())
.groupBy("country")
.agg(sum(col("volume")) as "pmtVolume")
saveToCassandra(df)
def saveToCassandra(df: DataFrame) {
try {
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("Table" -> "tableName", "keyspace" -> keyspace)
.mode("append").save()
}
catch {
case e: Throwable => log.error(e)
}
}
Since i am calling the action by saving the DF to Cassandra, i hope i need to handle the exception only on that line as per this thread.
If i get any exception, i can see the exception in the Spark detailed log by default.
Do i have to really surround the filter, group by code with Try
or try , catch?
I don't see any example on Spark SQL DataFrame API examples with exception handling.
How do i use the Try
on saveToCassandra
method? it returns Unit
Upvotes: 0
Views: 9007
Reputation: 536
There is no point wrapping the lazy DAG in try catch.
You would need to wrap the lambda function in Try().
Unfortunately there AFAIK there is no way to do row level exception handling in DataFrames.
You can use RDD or DataSet as mentioned in answer to this post below spache spark exception handling
Upvotes: 1
Reputation: 6095
You don't really need to surround the filter
, group by
code with Try
or try
, catch
. Since, all of these operations are transformations, they don't get execute until an action is performed on them, like saveToCassandra
in your case.
However, if an error occurs while filtering, grouping or aggregating the dataframe, the catch clause in saveToCassandra
function will log it as action is being performed there.
Upvotes: 0