Reputation: 754
I've been using Spark Dataset API to perform operations on a JSON to extract certain fields as needed. However, when the specification that I provide to let spark know what field to extract goes wrong, spark spits out an
org.apache.spark.sql.AnalysisException
How can unchecked runtime exceptions be handled in a distributed processing scenario like this ? I understand that throwing a try-catch would get things sorted but what is the recommended way to handle such a scenario
dataset = dataset.withColumn(current, functions.explode(dataset.col(parent + Constants.PUNCTUATION_PERIOD + child.substring(0, child.length() - 2))));
Upvotes: 3
Views: 5011
Reputation: 15549
In scala, you should simply wrap the call in a Try
and manage Failure. Something like:
val result = Try(executeSparkCode()) match {
case s: Success(_) => s;
case Failure(error: AnalysisException) => Failure(new MyException(error));
}
Note 1: If your question implies how to manage exception in scala, there are a lot of doc and post about this subject (i.e. don't throw). For example, you can check that answer (of mine)
Note 2: I don't have a scala dev env right here, so I didn't test this code)
In java there is a tricky situation however: the compiler doesn't expect an AnalysisException which is unchecked so you cannot catch this exception specifically. Probably some scala/java misunderstanding because scala doesn't track checked exceptions. What I did was:
try{
return executeSparkCode();
} catch (Exception ex) {
if(ex instanceOf AnalysisException){
throw new MyException(ex);
} else {
throw ex; // unmanaged exceptions
}
}
Note: In my case, I also tested the content of the error message for a specific exception that I must managed (i.e "path does not exist") in which case I return an empty dataset instead of throwing another exception. I was looking for a better solution and happened to get here...
Upvotes: 1