Mark Coletti
Mark Coletti

Reputation: 305

"Cannot pickle files that are not opened for reading" using Client.map()

I am trying to use dask.distributed to concurrently update a Postgresql database based on content from several CSV files. Ideally, we'd distribute CSV files among N workers where each worker would insert CSV file content into the database. However, we get a Cannot pickle files that are not opened for reading exception when using Client.map() when distributing tasks to workers.

This is a condensed version of the code:

def _work(csv_path):
   db = Database() # encapsulates interaction w/ postgresql database
   db.open()

   count = 0

   with csv_path.open('r') as csv_file:
       reader = csv.DictReader(csv_file)

       for record in reader:
           db.insert(record)
           count += 1

   db.close()

   return count


client = Client(processes=False)

csv_files = Path('/data/files/').glob('*.csv')

csv_futures = client.map(_work, csv_files) # error occurs here

for finished in  as_completed(csv_futures):
   count = finished.result()
   print(count)

Based on related stackoverflow and github issues, I successfully used cloudpickle to serialize and de-serialize the function and arguments.

cloudpickle.loads(cloudpickle.dumps(_work))
Out[69]: <function _work(csv_path)>

and

files = list(Path('/data/files/').glob('*.csv'))
files
Out[73]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]
cloudpickle.loads(cloudpickle.dumps(files))
Out[74]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]

So, the problem is elsewhere.

Upvotes: 1

Views: 1253

Answers (1)

Mark Coletti
Mark Coletti

Reputation: 305

The exact exception was this:

  File "/Users/may/anaconda/envs/eagle-i/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 841, in save_file
    raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
_pickle.PicklingError: Cannot pickle files that are not opened for reading: a

Stepping through the debugger, I was curious as to what obj was, which was this:

<_io.TextIOWrapper name='/tmp/logs/ei_sched.log' mode='a' encoding='UTF-8'>

In the example code snippet given above, I left out calls to our logger, and it was of this that cloudpickle was complaining. The logging was a leftover artifact of this functionality before trying to use dask to parallelize this functionality. Once I removed the logging calls from the function passed to Client.map(), things worked as expected.

As an aside, this was a good catch from cloudpickle because logging to a single file should not be done from dask workers.

Upvotes: 2

Related Questions