Bogdan MITREA
Bogdan MITREA

Reputation: 101

Spark dataframe in Python - execution stuck when using UDFs

I have a spark job written in Python which is reading data from the CSV files using DataBricks CSV reader.

I want to convert some columns from string to double by applying an udf function which actually is also changing the floating point separator.

convert_udf = F.udf(
    lambda decimal_str: _to_float(decimal_separator, decimal_str), 
    returnType=FloatType())

for name in columns:
     df = df.withColumn(name, convert_udf(df[name]))

def _to_float(decimal_separator, decimal_str):
    if isinstance(decimal_str, str) or isinstance(decimal_str, unicode):
        return (None if len(decimal_str.strip()) == 0 
               else float(decimal_str.replace(decimal_separator, '.')))
    else:
        return decimal_str

The Spark job is getting stuck when the udf function is called. I tried to return a fixed double value from the _to_float function without success. It looks like there is something wrong between the udf and data frame using SQL context.

Upvotes: 1

Views: 1670

Answers (1)

zero323
zero323

Reputation: 330063

Long story short don't use Python UDFs (and UDFs in general) unless it is necessary:

  • it is inefficient due to full round-trip through Python interpreter
  • cannot be optimized by Catalyst
  • creates long lineages if used iteratively

For simple operations like this one just use built-in functions:

from pyspark.sql.functions import regexp_replace

decimal_separator = ","
exprs = [
    regexp_replace(c, decimal_separator, ".").cast("float").alias(c) 
    if c in columns else c 
    for c in df.columns
]

df.select(*exprs)

Upvotes: 3

Related Questions