Reputation: 29
When I set the value of local
to 1, the operation is normal, but when set to 2, the error message is reported as follows
from pyspark import SparkContext
# Changing 1 to 2 will give you an error
sc = SparkContext("local[2]", "sort")
class MySort:
def __init__(self, tup):
self.tup = tup
def __gt__(self, other):
if self.tup[0] > other.tup[0]:
return True
elif self.tup[0] == other.tup[0]:
if self.tup[1] >= other.tup[1]:
return True
else:
return False
else:
return False
r1 = sc.parallelize([(1, 2), (2, 2), (2, 3), (2, 1), (1, 3)])
r2 = r1.sortBy(MySort)
print(r2.collect())
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 376, in dump_stream bytes = self.serializer.dumps(vs) File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 555, in dumps return pickle.dumps(obj, protocol) _pickle.PicklingError: Can't pickle : attribute lookup MySort on __main__ failed
Upvotes: 1
Views: 62
Reputation: 1497
Its really interesting attribute of spark i did not know it before. I think when you use single core, classes are not pickled(pickle is needed to use class in other places). But you can still use functions (i assume you sorted values by first two values):
key_func = lambda tup : tup[:2]
r1 = sc.parallelize([(1, 2), (2, 2), (2, 3), (2, 1), (1, 3)])
r2 = r1.sortBy(key_func)
print(r2.collect())
Upvotes: 0
Reputation: 476
I think you need to add params to spark-submit with file with your class:
--py-files your_file.py
because spark needs to pass this class to another worker.
Upvotes: 0