Reputation: 223
I have some code that reads several datasets, keyed by (call it) "year", and then does some joining on them. I am trying to speed up the code by parallelizing the "read" portion of the problem. I did so by writing this function:
Now, this code does start several processes in parallel and each of them finishes quickly, but the overall runtime ends up being slower than that of simply doing these reads serially.
What am I doing wrong?
def parallelQueueRead():
start_time = timeit.default_timer()
myq = Queue()
def reader(year, q):
loc_start_time = timeit.default_timer()
print("reading year %s" % (year))
astore = store(year)
df = astore.getAllData(TESTSPEC)
astore.close()
q.put((year, df))
print("finished reading year %s ,took: %s" %
(year, str(timeit.default_timer() - loc_start_time)))
processes = [Process(target = reader, args = (y, myq)) for y in CHUNKS ]
for p in processes:
p.start()
results = [ myq.get() for p in processes ]
results = sorted(results, key = lambda x: x[0])
print("parallel read took: " + str(timeit.default_timer() - start_time))
Output:
reading year 2011
reading year 2012
reading year 2013
reading year 2014
reading year 2015
finished reading year 2011 ,took: 1.142295703291893
finished reading year 2014 ,took: 1.2605517469346523
finished reading year 2013 ,took: 1.2637327639386058
finished reading year 2012 ,took: 1.2874943045899272
finished reading year 2015 ,took: 1.7436037007719278
parallel read took: 5.500953913666308
Output from another routine that does the same thing serially in just one process:
serial read took: 5.3680868614465
Post-script 1
Just to clarify: the serial version is a simple for-loop:
results = []
for year in CHUNKS:
results += [ astore.getAllData(TESTSPEC) ]
Post-script 2
On reading the documentation, I think the reason the parallel version is slow is due to pickling a large dataset (the result of the reader). The time taken to do this is included in the time reported by each picker (and in addition, the time taken to unpickle the results is included in the overall time).
This is really terrible news for me because it means that multiprocessing is not able to speedup the execution of my code.
Upvotes: 2
Views: 503
Reputation: 17455
Depending on the structure of the data in df
(result of astore.getAllData(TESTSPEC)
) you could try to use sharedctypes to store the collected data in the shared memory. Certainly this method is useful mainly for 'POD's - data-only structures without any code or complex objects within.
Also I would move entire dataprocessing to the children and make sure that astore
is actually capable to work in parallel w/o synchronizing (or at least minimizing sync time) between clients (different processes).
But certainly all these suggestions are based on the 'common sense' - without precise knowledge about your app internals and accurate profiling it'd be hard to say exactly what'd be the best solution for you
Upvotes: 1