Reputation: 29
We have a script that maps data into a dataframe (we're using pyspark). The data comes in as a string, and some other sometimes expensive stuff is done to it, but as part of the operation (calling withColumn) we do a cast to it's final data type.
I have a requirement to tell if truncation occurred, but we don't want to fail if it does. We just want a number to know how many rows in each translated column (there are about 300 columns) failed.
My first thought was to have each column pass through a UDF which would do the test, and the output would be an array with the value, and a value about if it passed the datatype checks. I'd then do 2 selections. One selects the raw values from the array, and the other aggregates the misses. But this seems like a sloppy solution. I'm fairly new to the pyspark/hadoop world... would love to know if there's a better (maybe standard?) way to do this.
Upvotes: 1
Views: 3132
Reputation: 958
i think the udf solution is not a bad idea. it's exactly what i did:
@udf(returnType=FloatType())
def float_cast(data):
if data is not None:
return float(data)
@udf(returnType=IntegerType())
def int_cast(data):
if data is not None:
return int(data)
def cast_safe(data, _type):
cast = {
"float": float_cast,
"int": int_cast,
}
return cast[_type](data)
df = df.withColumn(col_name, cast_safe(df[col_name], col_type))
Any possible exceptions will be triggered when materializing the df
Upvotes: 0
Reputation: 35229
In the latest Spark versions casting numbers in Spark doesn't fail and doesn't result in silent overflows - if value is not properly formatted, or is to large to be accommodated by the target type, the result is undefined - NULL
.
So all you have to do is simple count of NULL
values (Count number of non-NaN entries in each column of Spark dataframe with Pyspark) after cast
:
from pyspark.sql.functions import count
df = spark.createDataFrame(['132312312312312321312312', '123', '32'], 'string')
df_cast = df.withColumn('value_casted' , df['value'].cast('integer'))
df_cast.select((
# count('value') - count of NOT NULL values before
# count('value_casted') - count of NOT NULL values after
count('value') - count('value_casted')).alias('value_failed')
).show()
# +------------+
# |value_failed|
# +------------+
# | 1|
# +------------+
Upvotes: 9