Reputation: 646
I am trying to invoke a UDF that uses a broadcasted object in PySpark.
Here is a minimal example that reproduces the situation and error:
import pyspark.sql.functions as sf
from pyspark.sql.types import LongType
class SquareClass:
def compute(self, n):
return n ** 2
square = SquareClass()
square_sc = sc.broadcast(square)
def f(n):
return square_sc.value.compute(n)
numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
f_udf = sf.udf(f, LongType())
numbers.select(f_udf(numbers.id)).show(10)
The stacktrace and error message that this snippet produces:
Traceback (most recent call last)
<ipython-input-75-6e38c014e4b2> in <module>()
13 f_udf = sf.udf(f, LongType())
14
---> 15 numbers.select(f_udf(numbers.id)).show(10)
/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate)
255 +---+-----+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258
259 def __repr__(self):
/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id,
<snip>
An error occurred while calling o938.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
Upvotes: 0
Views: 719
Reputation: 5870
One thing you can do is, keep the class as separate module and add the module to sparkContext.
class_module.py
class SquareClass:
def compute(self, n):
return n ** 2
pyspark-shell
import pyspark.sql.functions as sf
from pyspark.sql.types import LongType
from class_module import SquareClass
sc.addFile('class_module.py')
square = SquareClass()
square_sc = sc.broadcast(square)
def f(n):
return square_sc.value.compute(n)
f_udf = sf.udf(f, LongType())
numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
numbers.select(f_udf(numbers.id)).show(10)
+-----+
|f(id)|
+-----+
| 0|
| 1|
| 4|
| 9|
| 16|
| 25|
| 36|
| 49|
| 64|
| 81|
+-----+
Upvotes: 1
Reputation: 10086
When calling the attributes of square_sc
you're calling the module SquareClass
which is not present on the workers.
If you want to use a python package, class, function in a UDF
, workers should be able to have access to it you can achieve this by putting the code in a python script and deploying it using --py-files
when running you spark-submit
, pyspark
Upvotes: 2