Reputation: 87
from pyspark.sql.functions import *
from pyspark.sql.types import *
I am trying to cast the data frame to df.column.cast(ShortType())
but when I tried to insert data 99999 it is converting to null without throwing any error, So can you suggest any approach to throw the error while converting.
Upvotes: 2
Views: 3995
Reputation: 21
In addition of using custom type checker UDF
, you may make use of pyspark.sql.functions.raise_error
. For example
col = F.coalesce(col.cast(ty),F.raise_error(err_msg))
df.select(col).show()
The query below is when the case NULL is acceptable.
col = (F.when(col.isNotNull(),
F.coalesce(col.cast(ty),F.raise_error(err_msg)))
.otherwise(col.cast(ty)))
df.select(col).show()
Spark will return NULL
upon pyspark.sql.Column.cast
failure. When the cast expression return NULL
to the first argument of the function coalesce
, then coalesce
will execute the second argument which is raise_error
. This leads to throwing exception.
Additionally, pyspark.sql.functions.raise_error
accepts one argument with either type of Column
or literal[str]
. This means that you may customize the error message that can print erroneous data.
Upvotes: 2
Reputation: 1962
Spark will fail silently if pyspark.sql.Column.cast
fails, i.e. the entire column will become NULL
. You have a couple of options to work around this:
mode=failfast
set, such as:df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+---+---+---+---+
Running spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show()
throws:
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
because the records in _c1
through _c4
are strings in the first row (since header=False
by default so headers are treated as rows). This can be found deeper in the stacktrace: Caused by: java.lang.NumberFormatException: For input string: "_c1"
. To compare, running
spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0| c1| c2| c3| c4|
+----+----+----+----+----+
|null|null|null|null|null|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+----+----+----+----+
But will throw the following warning WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.
pandas_udf
since it's vectorized). Here you're running the risk of encountering hard-to-debug errors with type matching, as you're trying to match types Python/Pandas use vs the JVM types PySpark uses. An example of this would be:import pyspark.sql.functions as f
df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| a| 0| 0| 2|
| b| 1| 1| 1|
+---+---+---+---+
@f.pandas_udf("long")
def my_cast(column):
return column.astype("int64")
df2.select(my_cast(f.col("c1"))).show()
This action will throw: ValueError: invalid literal for int() with base 10: 'b'
Upvotes: 0
Reputation: 6998
Spark does not throw if a cast goes wrong.
As a custom approach to catch those errors you could write a UDF that throws if you cast to null. This worsens the performance of your script though, since Spark can not optimise UDF executions.
Upvotes: 4