Kumar
Kumar

Reputation: 87

Data frame casting not throwing overflow exception and produces null

from pyspark.sql.functions import *
from pyspark.sql.types import *

I am trying to cast the data frame to df.column.cast(ShortType()) but when I tried to insert data 99999 it is converting to null without throwing any error, So can you suggest any approach to throw the error while converting.

Upvotes: 2

Views: 3995

Answers (3)

Teng Man Leong
Teng Man Leong

Reputation: 21

In addition of using custom type checker UDF, you may make use of pyspark.sql.functions.raise_error. For example

col = F.coalesce(col.cast(ty),F.raise_error(err_msg))
df.select(col).show()

The query below is when the case NULL is acceptable.

col = (F.when(col.isNotNull(),
                F.coalesce(col.cast(ty),F.raise_error(err_msg)))
        .otherwise(col.cast(ty)))
df.select(col).show()

Spark will return NULL upon pyspark.sql.Column.cast failure. When the cast expression return NULL to the first argument of the function coalesce, then coalesce will execute the second argument which is raise_error. This leads to throwing exception.

Additionally, pyspark.sql.functions.raise_error accepts one argument with either type of Column or literal[str]. This means that you may customize the error message that can print erroneous data.

Upvotes: 2

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

Spark will fail silently if pyspark.sql.Column.cast fails, i.e. the entire column will become NULL. You have a couple of options to work around this:

  1. If you want to detect types at the point reading from a file, you can read with a predefined (expected) schema and mode=failfast set, such as:
df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
|   0|  1|  0|  0|  2|
|   1|  1|  1|  1|  1|
+----+---+---+---+---+

Running spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show() throws: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. because the records in _c1 through _c4 are strings in the first row (since header=False by default so headers are treated as rows). This can be found deeper in the stacktrace: Caused by: java.lang.NumberFormatException: For input string: "_c1". To compare, running

spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0|  c1|  c2|  c3|  c4|
+----+----+----+----+----+
|null|null|null|null|null|
|   0|   1|   0|   0|   2|
|   1|   1|   1|   1|   1|
+----+----+----+----+----+

But will throw the following warning WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.

  1. Your second option is to use an UDF (or even better, a pandas_udf since it's vectorized). Here you're running the risk of encountering hard-to-debug errors with type matching, as you're trying to match types Python/Pandas use vs the JVM types PySpark uses. An example of this would be:
import pyspark.sql.functions as f

df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  0|  0|  2|
|  b|  1|  1|  1|
+---+---+---+---+


@f.pandas_udf("long")
def my_cast(column):
  return column.astype("int64")

df2.select(my_cast(f.col("c1"))).show()

This action will throw: ValueError: invalid literal for int() with base 10: 'b'

Upvotes: 0

Robert Kossendey
Robert Kossendey

Reputation: 6998

Spark does not throw if a cast goes wrong.

As a custom approach to catch those errors you could write a UDF that throws if you cast to null. This worsens the performance of your script though, since Spark can not optimise UDF executions.

Upvotes: 4

Related Questions