Reputation: 65
Please find the code below:
import pandas as pd
from scipy.stats import norm
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import math
from pyspark.sql.functions import udf
from scipy.special import erfinv
# create sample data
df = spark.createDataFrame([
(1, 0.008),
(2, -1.23),
(3, 4.56),
], ['id', 'value'])
def normal_cdf(x):
return (math.sqrt(2) * erfinv(x*2-1))
my_udf1 = udf(normal_cdf)
df1 = df.withColumn('prob', my_udf1(F.col('value')))
df1.show()
Error:
Py4JJavaError: An error occurred while calling o420.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 21, d2-td-cdh.boigroup.net, executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 361, in main func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 236, in read_udfs arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 163, in read_single_udf f, return_type = read_command(pickleSer, infile) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 64, in read_command command = serializer._read_with_length(file) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/var/opt/teradata/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 577, in loads return pickle.loads(obj, encoding=encoding) File "/usr/local/lib/python3.6/site-packages/numpy/core/init.py", line 131, in _ufunc_reconstruct mod = import(module, fromlist=[name]) ModuleNotFoundError: No module named 'scipy'
It works fine when executed separately:
n=math.sqrt(2)*erfinv(2*0.010-1)
print(n)
I have Scipy on my system. What is the issue here?
Upvotes: 0
Views: 252
Reputation: 75150
This is because some dtypes which are being returned by numpy are not compatible with spark, also I think you should replace np.nan
with Null
, try below:
def normal_cdf(x):
val = (math.sqrt(2) * erfinv(x*2-1))
return float(val) if pd.notna(val) else None
my_udf1 = udf(normal_cdf)
#my_udf1 = udf(normal_cdf,T.DoubleType()) for returning double
df1 = df.withColumn('prob', my_udf1(F.col('value')))
df1.show()
+---+-----+-------------------+
| id|value| prob|
+---+-----+-------------------+
| 1|0.008|-2.4089155458154616|
| 2|-1.23| null|
| 3| 4.56| null|
+---+-----+-------------------+
Upvotes: 1