Alfredo Di Massimo
Alfredo Di Massimo

Reputation: 51

GridSearchCV & BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable

I'm running a GridSearchCV for NLP data, this is the code I'm using:

%%time
# Next we can specify the hyperparameters for each model
param_grid = [
    {
    'transformer': list_of_vecs, 
    'scaler': [StandardScaler()],    
    'model': [LogisticRegression()],
    'model__penalty': ['l1', 'l2'],
    'model__C': [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000]
    },
    {
    'transformer': list_of_vecs, 
    'scaler': [StandardScaler()],
    'model': [DecisionTreeClassifier()],
    'model__max_depth': [2, 3, 4, 5, 6]
    }
]

# Train the GridSearch
grid = GridSearchCV(pipe, param_grid, cv=5, n_jobs=-1)

fitted_grid = grid.fit(X_train, y_train)

I've already run the GridSearch successfully once without any issue with fewer hyperparameters just to make sure it would run, but I started to suddenly get this error after I added a few more model__parameters and it only appears after about an hour of the code running. Any idea how I can fix this?:

exception calling callback for <Future at 0x1da7efdba60 state=finished raised BrokenProcessPool> joblib.externals.loky.process_executor._RemoteTraceback: """ Traceback (most recent call last): File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\externals\loky\process_executor.py", line 407, in _process_worker File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\queues.py", line 117, in get res = self._recv_bytes() File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 221, in recv_bytes File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 323, in _recv_bytes File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 345, in _get_more_data MemoryError """

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\externals\loky_base.py", line 625, in _invoke_callbacks callback(self) File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\parallel.py", line 359, in call self.parallel.dispatch_next() File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\parallel.py", line 794, in dispatch_next if not self.dispatch_one_batch(self._original_iterator): File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\parallel.py", line 861, in dispatch_one_batch self._dispatch(tasks) File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\parallel.py", line 779, in _dispatch job = self._backend.apply_async(batch, callback=cb) File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib_parallel_backends.py", line 531, in apply_async future = self._workers.submit(SafeFunction(func)) File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\externals\loky\reusable_executor.py", line 177, in submit return super(_ReusablePoolExecutor, self).submit( File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\externals\loky\process_executor.py", line 1115, in submit raise self._flags.broken joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

_RemoteTraceback Traceback (most recent call last) _RemoteTraceback: """ Traceback (most recent call last): File "C:\Users\Alfredo\anaconda3\lib\site-packages\joblib\externals\loky\process_executor.py", line 407, in _process_worker File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\queues.py", line 117, in get res = self._recv_bytes() File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 221, in recv_bytes File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 323, in _recv_bytes File "C:\Users\Alfredo\anaconda3\lib\multiprocessing\connection.py", line 345, in _get_more_data MemoryError """

The above exception was the direct cause of the following exception:

BrokenProcessPool Traceback (most recent call last) in

~\anaconda3\lib\site-packages\sklearn\model_selection_search.py in fit(self, X, y, groups, **fit_params) 889 return results 890 --> 891 self._run_search(evaluate_candidates) 892 893 # multimetric is determined here because in the case of a callable

~\anaconda3\lib\site-packages\sklearn\model_selection_search.py in _run_search(self, evaluate_candidates) 1390 def _run_search(self, evaluate_candidates): 1391 """Search all candidates in param_grid""" -> 1392 evaluate_candidates(ParameterGrid(self.param_grid)) 1393 1394

~\anaconda3\lib\site-packages\sklearn\model_selection_search.py in evaluate_candidates(candidate_params, cv, more_results) 836 ) 837 --> 838 out = parallel( 839 delayed(_fit_and_score)( 840 clone(base_estimator),

~\anaconda3\lib\site-packages\joblib\parallel.py in call(self, iterable) 1054 1055 with self._backend.retrieval_context(): -> 1056 self.retrieve() 1057 # Make sure that we get a last message telling us we are done 1058
elapsed_time = time.time() - self._start_time

~\anaconda3\lib\site-packages\joblib\parallel.py in retrieve(self) 933 try: 934 if getattr(self._backend, 'supports_timeout', False): --> 935 self._output.extend(job.get(timeout=self.timeout)) 936 else: 937 self._output.extend(job.get())

~\anaconda3\lib\site-packages\joblib_parallel_backends.py in wrap_future_result(future, timeout) 540 AsyncResults.get from multiprocessing.""" 541 try: --> 542 return future.result(timeout=timeout) 543 except CfTimeoutError as e: 544 raise TimeoutError from e

~\anaconda3\lib\concurrent\futures_base.py in result(self, timeout) 443 raise CancelledError() 444 elif self._state == FINISHED: --> 445 return self.__get_result() 446 else: 447 raise TimeoutError()

~\anaconda3\lib\concurrent\futures_base.py in __get_result(self) 388 if self._exception: 389 try: --> 390 raise self._exception 391 finally: 392 # Break a reference cycle with the exception in self._exception

~\anaconda3\lib\site-packages\joblib\externals\loky_base.py in _invoke_callbacks(self) 623 for callback in self._done_callbacks: 624 try: --> 625 callback(self) 626 except BaseException: 627 LOGGER.exception('exception calling callback for %r', self)

~\anaconda3\lib\site-packages\joblib\parallel.py in call(self, out) 357 with self.parallel._lock: 358 if self.parallel._original_iterator is not None: --> 359 self.parallel.dispatch_next() 360 361

~\anaconda3\lib\site-packages\joblib\parallel.py in dispatch_next(self) 792 793 """ --> 794 if not self.dispatch_one_batch(self._original_iterator): 795 self._iterating = False 796 self._original_iterator = None

~\anaconda3\lib\site-packages\joblib\parallel.py in dispatch_one_batch(self, iterator) 859 return False 860 else: --> 861 self._dispatch(tasks) 862 return True 863

~\anaconda3\lib\site-packages\joblib\parallel.py in _dispatch(self, batch) 777 with self._lock: 778 job_idx = len(self._jobs) --> 779 job = self._backend.apply_async(batch, callback=cb) 780 # A job can complete so quickly than its callback is 781 # called before we get here, causing self._jobs to

~\anaconda3\lib\site-packages\joblib_parallel_backends.py in apply_async(self, func, callback) 529 def apply_async(self, func, callback=None): 530 """Schedule a func to be run""" --> 531 future = self._workers.submit(SafeFunction(func)) 532 future.get = functools.partial(self.wrap_future_result, future) 533 if callback is not None:

~\anaconda3\lib\site-packages\joblib\externals\loky\reusable_executor.py in submit(self, fn, *args, **kwargs) 175 def submit(self, fn, *args, **kwargs): 176 with self._submit_resize_lock: --> 177 return super(_ReusablePoolExecutor, self).submit( 178 fn, *args, **kwargs) 179

~\anaconda3\lib\site-packages\joblib\externals\loky\process_executor.py in submit(self, fn, *args, **kwargs) 1113 with self._flags.shutdown_lock: 1114 if self._flags.broken is not None: -> 1115 raise self._flags.broken 1116 if self._flags.shutdown: 1117 raise ShutdownExecutorError(

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

Upvotes: 1

Views: 2948

Answers (1)

Felipe De Morais
Felipe De Morais

Reputation: 11

You can fix it by removing the n_jobs=-1. However, I am not sure how to fix and also allow parallel processing. Another thing you could try is to set the pre_dispatch. It controls the number of jobs that get dispatched during parallel execution. The default value is 2 times the n_jobs. Thus, it could be overloading your processing queue. I had a case like yours, and I have set the n_jobs = -1 and the pre_dispatch = '1*n_jobs'. This worked for me.

Upvotes: 1

Related Questions