QPSK
QPSK

Reputation: 29

How to test datatype conversion during casting

We have a script that maps data into a dataframe (we're using pyspark). The data comes in as a string, and some other sometimes expensive stuff is done to it, but as part of the operation (calling withColumn) we do a cast to it's final data type.

I have a requirement to tell if truncation occurred, but we don't want to fail if it does. We just want a number to know how many rows in each translated column (there are about 300 columns) failed.

My first thought was to have each column pass through a UDF which would do the test, and the output would be an array with the value, and a value about if it passed the datatype checks. I'd then do 2 selections. One selects the raw values from the array, and the other aggregates the misses. But this seems like a sloppy solution. I'm fairly new to the pyspark/hadoop world... would love to know if there's a better (maybe standard?) way to do this.

Upvotes: 1

Views: 3132

Answers (2)

Zach
Zach

Reputation: 958

i think the udf solution is not a bad idea. it's exactly what i did:

@udf(returnType=FloatType())
def float_cast(data):
    if data is not None:
        return float(data)


@udf(returnType=IntegerType())
def int_cast(data):
    if data is not None:
        return int(data)

def cast_safe(data, _type):
    cast = {
        "float": float_cast,
        "int": int_cast,
    }
    return cast[_type](data)

df = df.withColumn(col_name, cast_safe(df[col_name], col_type))

Any possible exceptions will be triggered when materializing the df

Upvotes: 0

Alper t. Turker
Alper t. Turker

Reputation: 35229

In the latest Spark versions casting numbers in Spark doesn't fail and doesn't result in silent overflows - if value is not properly formatted, or is to large to be accommodated by the target type, the result is undefined - NULL.

So all you have to do is simple count of NULL values (Count number of non-NaN entries in each column of Spark dataframe with Pyspark) after cast:

from pyspark.sql.functions import count

df = spark.createDataFrame(['132312312312312321312312', '123', '32'], 'string')
df_cast = df.withColumn('value_casted' , df['value'].cast('integer'))

df_cast.select((
    # count('value')         - count of NOT NULL values before
    # count('value_casted')  - count of NOT NULL values after
    count('value') - count('value_casted')).alias('value_failed')
).show()
# +------------+
# |value_failed|
# +------------+
# |           1|
# +------------+

Upvotes: 9

Related Questions