Reputation: 175
I need to parallelize a for loop. My current code is loops through a list of ids I am getting from a xarray dataset, gets the row data from the xarray Dataset with the curresponding id, calls a function (calculation the triangular distribution of the data), appends the result distribution of the function into a list and once done it transforms the list into a xarray Dataset, where each result is linked to the curresponding id, so later on this Dataset can be appended by the ID to the "main" dataset.
My piece of code looks a little like this:
from sklearn.preprocessing import MinMaxScaler
import xarray as xr
import scipy.stats as st
function call_func(data):
scaler = MinMaxScaler()
norm_data = scaler.fit_transform(np.reshape(data, (len(data),1)))
params = st.triang.fit(norm_data)
arg,loc,scale = params[:-2],params[-2],params[-1]
dist = st.triang(loc=loc, scale=scale, *arg)
return dist
if __name__ == "__main__":
for id in my_dataset['id'].values:
row_data= my_dataset.sel(id=id)['data'].values[0]
if len(row_data)>3 and all(row_data== 0) == False:
result = call_func(row_data)
result_list.append(result)
else:
result_list.append([])
new_dataset = xr.Dataset({'id': my_dataset['id'].values,
'dist_data':(['id','dist'],
np.reshape(np.array(result_list),(len(result_list),1)))
})
As the id_array is huge, I want to parallize the loop. This is a generic question, however I am new to the multiprocessing tool. Do you have a recommandation how to combine multiprocessing with this task? My research made it quite obvious that multiprocessing and appending-to-list is not the smartest thing to do.
Upvotes: 3
Views: 5846
Reputation: 13403
I'll try and give a simple dummy example, hoping you can deduce the modification needed for your code:
here's a regular loop version of a code:
id_array = [*range(10)]
result = []
for id in id_array:
if id % 2 == 0:
result.append((id, id))
else:
result.append((id, id ** 2))
print(result)
Output:
[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49), (8, 8), (9, 81)]
Here, using ProcessPoolExecutor
, I parallelized it over 4 processes:
from concurrent.futures import ProcessPoolExecutor
id_array = [*range(10)]
def myfunc(id):
if id % 2 == 0:
return id, id
else:
return id, id ** 2
result = []
with ProcessPoolExecutor(max_workers=4) as executor:
for r in executor.map(myfunc, id_array):
result.append(r)
print(result)
Output (the same):
[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49), (8, 8), (9, 81)]
basically:
for
content to a function that returns the desired valueProcessPoolExecutor
with executor.map(myfunc, id_array)
Upvotes: 5