Amy Jack
Amy Jack

Reputation: 65

Pyspark - erfinv function is not working properly

Please find the code below:

import pandas as pd
from scipy.stats import norm
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import math
from pyspark.sql.functions import udf
from scipy.special import erfinv


# create sample data
df = spark.createDataFrame([
    (1, 0.008),
    (2, -1.23),
    (3, 4.56),
], ['id', 'value'])

def normal_cdf(x):

    return (math.sqrt(2) * erfinv(x*2-1))
  
my_udf1 = udf(normal_cdf)

df1 = df.withColumn('prob', my_udf1(F.col('value')))

df1.show()

Error:

Py4JJavaError: An error occurred while calling o420.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, d2-td-cdh.boigroup.net, executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 361, in main func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 236, in read_udfs arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 163, in read_single_udf f, return_type = read_command(pickleSer, infile) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command command = serializer._read_with_length(file) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads return pickle.loads(obj, encoding=encoding) File "/usr/local/lib/python3.6/site-packages/numpy/core/init.py", line 131, in _ufunc_reconstruct mod = import(module, fromlist=[name]) ModuleNotFoundError: No module named 'scipy'

It works fine when executed separately:

n=math.sqrt(2)*erfinv(2*0.010-1)
print(n)

I have Scipy on my system. What is the issue here?

Upvotes: 0

Views: 252

Answers (1)

anky
anky

Reputation: 75150

This is because some dtypes which are being returned by numpy are not compatible with spark, also I think you should replace np.nan with Null, try below:

def normal_cdf(x):
    val = (math.sqrt(2) * erfinv(x*2-1)) 
    return float(val) if pd.notna(val) else None
  
my_udf1 = udf(normal_cdf)
#my_udf1 = udf(normal_cdf,T.DoubleType()) for returning double

df1 = df.withColumn('prob', my_udf1(F.col('value')))

df1.show()

+---+-----+-------------------+
| id|value|               prob|
+---+-----+-------------------+
|  1|0.008|-2.4089155458154616|
|  2|-1.23|               null|
|  3| 4.56|               null|
+---+-----+-------------------+

Upvotes: 1

Related Questions