Ahmad Abdullah
Ahmad Abdullah

Reputation: 63

PySpark Pipeline.fit(df) method give PicklingError: Could not serialize object: ValueError: substring not found while using Elephas

I am working on the following dataset which is a Churn prediction problem: https://www.kaggle.com/jpacse/telecom-churn-new-cell2cell-dataset

I am using pyspark, keras & Elephas to build a distributed neural network model using pyspark pipeline.

When I fit the dataset in the pipeline I get the pickling error. I am following this link to build a model: https://github.com/aviolante/pyspark_dl_pipeline/blob/master/pyspark_dl_pipeline.ipynb

The line on which I am getting the error in my code is:

dl_pipeline.fit(train_data)

train_data contains two columns: 'features' and 'label'. 'features' is assembled using VectorAssembler. Before assembling, all features were converted to float. 'label' contains 0 and 1 only.

Following is the PicklingError:

>>> Fit model
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py", line 597, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 657, in save_instancemethod
    self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
  File "/usr/lib/python3.6/pickle.py", line 610, in save_reduce
    save(args)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/usr/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 859, in __reduce__
    name=self._shared_name,
  File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/variables.py", line 1140, in _shared_name
    return self.name[:self.name.index(":")]
ValueError: substring not found
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
    596         try:
--> 597             return cloudpickle.dumps(obj, 2)
    598         except pickle.PickleError:

49 frames
ValueError: substring not found

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
    605                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    606             cloudpickle.print_exec(sys.stderr)
--> 607             raise pickle.PicklingError(msg)
    608 
    609 

PicklingError: Could not serialize object: ValueError: substring not found

Any guidance would be appreciated. Thank you.

Upvotes: 2

Views: 732

Answers (2)

danielcahall
danielcahall

Reputation: 2752

This issue is also resolved in the latest 1.0.0 release: https://github.com/danielenricocahall/elephas/releases/tag/1.0.0 as the tensorflow.keras import is used, rather than using keras and tensorflow separately, removing the incompatibility.

Upvotes: 1

Ahmad Abdullah
Ahmad Abdullah

Reputation: 63

The solution which worked for me is found here:

https://github.com/maxpumperla/elephas/issues/151

I downgraded my keras and tensorflow version using the following commands:

!pip install q keras==2.2.4
!pip install q tensorflow==1.14.0

The pickling error was gone after this.

Upvotes: 2

Related Questions