Praveenks
Praveenks

Reputation: 1494

Pyspark UDF function is throwing an error

I am trying to achieve the difference between two timestamp column values. Trying to achieve the same result using different methods available in spark. I am able to achieve the same result using Spark SQL and a normal function. But, when I am trying to register this function as UDF, it starts throwing an error.

Data:

id|end_date|start_date|location
1|2015-10-14 00:00:00|2015-09-14 00:00:00|CA-SF
2|2015-10-15 01:00:20|2015-08-14 00:00:00|CA-SD
3|2015-10-16 02:30:00|2015-01-14 00:00:00|NY-NY
4|2015-10-17 03:00:20|2015-02-14 00:00:00|NY-NY
5|2015-10-18 04:30:00|2014-04-14 00:00:00|CA-SD

Using SparkSQL: Works Fine !!

data.createOrReplaceTempView("data_tbl")
query = "SELECT id, end_date, start_date,\
        datediff(end_date,start_date) as dtdiff FROM data_tbl"

spark.sql(query).show()

Using Python Function: Works Fine !!

from pyspark.sql.functions import datediff

def get_diff(x, y):
    result = datediff(x,y)
    return result

data.withColumn('differ',get_diff('end_date','start_date')).show()

result in both cases :

+---+-------------------+-------------------+--------+------+
| id|           end_date|         start_date|location|differ|
+---+-------------------+-------------------+--------+------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|    30|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|    62|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|   275|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|   245|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|   552|
+---+-------------------+-------------------+--------+------+

Register function as UDF: Not Working !!

from pyspark.sql.functions import udf, datediff
get_diff_udf = udf(lambda x, y: datediff(x,y))
data.withColumn('differ',get_diff_udf('end_date','start_date')).show()

Error:

Py4JJavaError: An error occurred while calling o934.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.

Upvotes: 2

Views: 3661

Answers (1)

pissall
pissall

Reputation: 7419

You need to disable fork safety by setting the OBJC_DISABLE_INITIALIZE_FORK_SAFETY environment variable to YES. This has solved the same issue for me.

You can include this in your script:

import os
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'

To know more about fork safety or why we need to set that env variable:

Multiprocessing causes Python to crash and gives an error may have been in progress in another thread when fork() was called

Upvotes: 5

Related Questions