Reputation: 305
I am trying to use dask.distributed
to concurrently update a Postgresql database based on content from several CSV files. Ideally, we'd distribute CSV files among N workers where each worker would insert CSV file content into the database. However, we get a Cannot pickle files that are not opened for reading
exception when using Client.map()
when distributing tasks to workers.
This is a condensed version of the code:
def _work(csv_path):
db = Database() # encapsulates interaction w/ postgresql database
db.open()
count = 0
with csv_path.open('r') as csv_file:
reader = csv.DictReader(csv_file)
for record in reader:
db.insert(record)
count += 1
db.close()
return count
client = Client(processes=False)
csv_files = Path('/data/files/').glob('*.csv')
csv_futures = client.map(_work, csv_files) # error occurs here
for finished in as_completed(csv_futures):
count = finished.result()
print(count)
Based on related stackoverflow and github issues, I successfully used cloudpickle
to serialize and de-serialize the function and arguments.
cloudpickle.loads(cloudpickle.dumps(_work))
Out[69]: <function _work(csv_path)>
and
files = list(Path('/data/files/').glob('*.csv'))
files
Out[73]:
[PosixPath('/data/files/208.csv'),
PosixPath('/data/files/332.csv'),
PosixPath('/data/files/125.csv'),
PosixPath('/data/files/8.csv')]
cloudpickle.loads(cloudpickle.dumps(files))
Out[74]:
[PosixPath('/data/files/208.csv'),
PosixPath('/data/files/332.csv'),
PosixPath('/data/files/125.csv'),
PosixPath('/data/files/8.csv')]
So, the problem is elsewhere.
Upvotes: 1
Views: 1253
Reputation: 305
The exact exception was this:
File "/Users/may/anaconda/envs/eagle-i/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 841, in save_file
raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
_pickle.PicklingError: Cannot pickle files that are not opened for reading: a
Stepping through the debugger, I was curious as to what obj
was, which was this:
<_io.TextIOWrapper name='/tmp/logs/ei_sched.log' mode='a' encoding='UTF-8'>
In the example code snippet given above, I left out calls to our logger, and it was of this that cloudpickle
was complaining. The logging was a leftover artifact of this functionality before trying to use dask to parallelize this functionality. Once I removed the logging calls from the function passed to Client.map()
, things worked as expected.
As an aside, this was a good catch from cloudpickle
because logging to a single file should not be done from dask workers.
Upvotes: 2