Reputation:
My original question was about parallelism under Python. However, since the question remained without an answer I deleted it and I try to summarize my conclusions. Hopefully it will help someone...
In general there are two main ways to make your code running parallel - either by using multithreading or multiprocessing library.
According to many posts on stackoverflow.com multithreading library is able to share memory effectively across threads but runs the threads on a single core. Therefore it can speed up your code mainly if the bottleneck are I/O operations. I am not sure if there are many real life applications for the library...
If your code is CPU intensive (sometimes called CPU bounded), multiprocessing library could be answer to your problem. The library spreads the threads across individual cores. However, many people (including me) observed that such a multicore code can be significantly slower its singlecore counterpart. The issue is supposedly caused by the fact that individual threads are not able to effectively share memory - data is extensively copied, which creates quite an overhead. As my below code illustrates the overhead is hugely dependent on input data type. The problem is according to many more profound on Windows than on Linux. I have to say that parallelism is my biggest Python disappointment - apparently Python was not designed with parallelism in mind...
The first piece of code allocates pandas dataframe
between cores using Process
.
import numpy as np
import math as mth
import pandas as pd
import time as tm
import multiprocessing as mp
def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv[core_idx] = np.array(bnds_info['npv'])
def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """
bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no) / cores_no) # number of bonds allocated to one core
# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy())
# return list of dataframes
return bnds_info_mp
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
manager = mp.Manager()
npv = manager.dict()
bnds_info_mp = split_bnds_info(bnds_info, cores_no)
processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]
# return NPV of individual bonds
return np.hstack(npv.values())
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = {'currency_name' : 'EUR', 'npv' : 100}
bnds_info = pd.DataFrame(bnds_info, index = range(1))
bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv = np.array(bnds_info['npv'])
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
The second code is the same as the one before - the only difference is that this time we use numpy array
instead of pandas dataframe
and the performance difference is huge (compare run time changes for single core with run time changes for multicore).
import numpy as np
import math as mth
import time as tm
import multiprocessing as mp
def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv[core_idx] = bnds_info
def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """
bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no) / cores_no) # number of bonds allocated to one core
# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound])
# return list of dataframes
return bnds_info_mp
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
manager = mp.Manager()
npv = manager.dict()
bnds_info_mp = split_bnds_info(bnds_info, cores_no)
processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]
# return NPV of individual bonds
return np.hstack(npv.values())
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100] * bnds_no)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
The last piece of code is using Pool
instead of Process
. Run time is slightly better.
import numpy as np
import time as tm
import multiprocessing as mp
#import pdb
#pdb.set_trace()
def bnd_calc_npv_dummy(bnds_info):
""" multiple core dummy valuation function (based on single core function) """
try:
# get number of bonds
bnds_no = len(bnds_info)
except:
pass
bnds_no = 1
tm.sleep(0.0001 * bnds_no)
return bnds_info
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
pool = mp.Pool(processes = cores_no)
npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist())
# return NPV of individual bonds
return npv
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100.0] * bnds_no)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
So, my conclusion is that Python implementation of parallelism is not applicable in real life (I used Python 2.7.13 and Window 7). Best regards,
Macky
PS: If someone is able to change the code I will more than happily change my mind...
Upvotes: 1
Views: 477
Reputation: 43563
Multiprocessing works best when parts of a problem can be calculated independantly, e.g. with a multiprocessing.Pool
.
Every worker process in the pools processes part of the input and returns a result to the master process.
If all processes need to modify data all over the input arrays, then it is likely that the syncronization overhead from the manager
destroys any gains from multiprocessing.
Upvotes: 1