kyrre
kyrre

Reputation: 646

Error when calling UDF using broadcasted objects in PySpark

I am trying to invoke a UDF that uses a broadcasted object in PySpark.

Here is a minimal example that reproduces the situation and error:

import pyspark.sql.functions as sf
from pyspark.sql.types import LongType


class SquareClass:
    def compute(self, n):
        return n ** 2


square = SquareClass()
square_sc = sc.broadcast(square)

def f(n):
    return square_sc.value.compute(n)  

numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
f_udf = sf.udf(f, LongType())  

numbers.select(f_udf(numbers.id)).show(10)

The stacktrace and error message that this snippet produces:

Traceback (most recent call last)
<ipython-input-75-6e38c014e4b2> in <module>()
     13 f_udf = sf.udf(f, LongType())
     14 
---> 15 numbers.select(f_udf(numbers.id)).show(10)

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    255         +---+-----+
    256         """
--> 257         print(self._jdf.showString(n, truncate))
    258 
    259     def __repr__(self):

/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, 

<snip>

An error occurred while calling o938.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

Upvotes: 0

Views: 719

Answers (2)

Suresh
Suresh

Reputation: 5870

One thing you can do is, keep the class as separate module and add the module to sparkContext.

class_module.py

class SquareClass:
    def compute(self, n):
        return n ** 2

pyspark-shell 

    import pyspark.sql.functions as sf
    from pyspark.sql.types import LongType
    from class_module import SquareClass

    sc.addFile('class_module.py')

    square = SquareClass()
    square_sc = sc.broadcast(square) 
    def f(n):
        return square_sc.value.compute(n)

    f_udf = sf.udf(f, LongType())
    numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
    numbers.select(f_udf(numbers.id)).show(10)
    +-----+
    |f(id)|
    +-----+
    |    0|
    |    1|
    |    4|
    |    9|
    |   16|
    |   25|
    |   36|
    |   49|
    |   64|
    |   81|
    +-----+

Upvotes: 1

MaFF
MaFF

Reputation: 10086

When calling the attributes of square_sc you're calling the module SquareClass which is not present on the workers.

If you want to use a python package, class, function in a UDF, workers should be able to have access to it you can achieve this by putting the code in a python script and deploying it using --py-files when running you spark-submit, pyspark

Upvotes: 2

Related Questions