Kevin Gomez
Kevin Gomez

Reputation: 343

How to handle an AnalysisException on Spark SQL?

I am trying to execute a list of queries in Spark, but if the query does not run correctly, Spark throws me the following error: AnalysisException: "ALTER TABLE CHANGE COLUMN is not supported for changing ...

This is part of my code (i'm using python and Spark SQL on Databricks):

for index, row in df_tables.iterrows():
  query = row["query"]
  print ("Executing query: ")
  try:
      spark.sql(query)
      print ("Query executed")
  except (ValueError, RuntimeError, TypeError, NameError):
      print("Unable to process your query dude!!")
  else:
      #do another thing

Is there any way to catch that exception? ValueError, RuntimeError, TypeError, NameError seems not working. There's no so much information about that in the Spark webpage.

Upvotes: 30

Views: 44718

Answers (5)

s_pike
s_pike

Reputation: 2123

For pyspark >= 3.4.0 exceptions can be caught using the pyspark error framework in pypsark.errors. The AnalysisException required in this example can be imported as follows:

from pyspark.errors import AnalysisException

The error can then be caught:

try:
    spark.sql(query)
    print ("Query executed")
except AnalysisException:
    print("Your query raised an AnalysisException")

If you want to be more specific about which AnalysisException you are catching, you need to look into the error message:

try:
    spark.sql(query)
    print ("Query executed")
except AnalysisException as e:
    if "ALTER TABLE CHANGE COLUMN is not supported for changing" is in str(e):
        print("Your query raised a specific AnalysisException")
    else:
        raise e

Upvotes: 1

WagnerAlbJr
WagnerAlbJr

Reputation: 340

I want to propose a way to pick specific Exceptions. I had the problem to find if some table already existed. The simplest way that i have found to do this is like this. Of course that this can break if Spark maintainers change the message of the exception, but i think that they do not have reason to do this, in this case.

    import pyspark.sql.utils

    try:
        spark.read.parquet(SOMEPATH)
    except pyspark.sql.utils.AnalysisException as e:
        if "Path does not exist:" in str(e):
            # Finding specific message of Exception.
            pass # run some code to address this specific case.
        else:
            # if this is not the AnalysisException that i was waiting,
            # i throw again the exception
            raise (e)
    except Exception as e:
        # if is another exception i can catch like this
        print(e)
        raise (e)

Upvotes: 7

Brett Koblinger
Brett Koblinger

Reputation: 541

I found AnalysisException defined in pyspark.sql.utils. https://spark.apache.org/docs/3.0.1/api/python/_modules/pyspark/sql/utils.html

import pyspark.sql.utils
try:
    spark.sql(query)
    print ("Query executed")
except pyspark.sql.utils.AnalysisException:
    print("Unable to process your query dude!!")

Upvotes: 54

Manish
Manish

Reputation: 1157

You can modify the try except statement as below :

try:
  spark.sql(query)
  print ("Query executed")
except Exception as x:
  print("Unable to process your query dude!!" + \
        "\n" + "ERROR : " + str(x)) 

Upvotes: 6

Piyush Patel
Piyush Patel

Reputation: 1751

I think it depends on your requirements. If you're running full workflow on this query and if you want them to pass through then your code will work fine. But let's say you want your workflow or datapipeline to fail, then you should exit from that except block.

The exact exception you may not get, but you can definitely get overview using

except Exception as x:
  print(str(x)) 

You can use logging module for putting more information in logs for further investigation.

Upvotes: 0

Related Questions