Reputation: 2665
I have a function with multiple parameters, iterable_token
,dataframe
,label_array
. However, only iterable_token
is iterable in the function.
def cross_tab(label,token_presence):
A_token=0
B_token=0
C_token=0
D_token=0
for i,j in zip(list(label),list(token_presence)):
if i==True and j==True:
A_token+=1
elif i==False and j==False:
D_token+=1
elif i==True and j==False:
C_token+=1
elif i==False and j==True:
B_token+=1
return A_token,B_token,C_token,D_token
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
for token in iterable_token:
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
except Exception as e:
pass
return (A,B,C,D,token_count,token_list)
How do i parallelize My_ParallelFunction
function?
Edit1: I tried the method suggested in example 1 because that's what i am looking for, to parallelize function.
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))
but error message is
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<timed exec> in <module>
/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
272 `func` and (a, b) becomes func(a, b).
273 '''
--> 274 return self._map_async(func, iterable, starmapstar, chunksize).get()
275
276 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
Edit2: Here is the file i am using. You can download it from here and unzip. Also, run below script to get the required input parameters. Make sure to install nltk
, pandas
and numpy
and change path to the file TokenFile.csv
.
from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np
dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)
def get_uniquetoken(stop_words,input_doc_list):
##get unique words across all documents
if stop_words:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
else:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
unique_words=set(unique_words)
print('unique_words done! length is:',len(unique_words) )
return unique_words
input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)
Edit 3 This is the solution i am using
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
i=0
with mp.Pool(4) as p:
token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
#print(token_result[0])
return token_result#(A,B,C,D,token_count,token_list)
def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
#print(token)
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
#print(token_presence_sum)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
#print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
# print('token_list:',token_list)
except Exception as e:
pass
return A,B,C,D,token_count,token_list
However it is not giving me the result i want. Its a 949 X 6 X different_sizes matrix
Upvotes: 2
Views: 3183
Reputation: 10789
Here are two toy examples to show how you can parallelize a similar function.
First Option. If you want to parallelize the whole function. You can do that using Pool.starmap(). .starmap() works like map(), but you can pass multiple arguments to it.
from multiprocessing import Pool
import time
#Example 1 Simple function parallelization
def f(a,b,c,_list):
x = a+b+c
time.sleep(1)
_list.append(x)
return _list
inputs = [
(1,2,3,['a','b','c']),
(1,2,3,['d','e','f']),
(1,2,3,['x','y','z']),
(1,2,3,['A','B','C']),
]
start = time.time()
with Pool(4) as p:
results = p.starmap(f, inputs)
end = time.time()
for r in results:
print(r)
print(f'done in {round(end-start, 3)} seconds')
Output:
['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds
Second option. If you want to parallelize only the for-loop inside the function. In that case, you should rewrite your loop as a function and call it using Pool.map() or Pool.starmap().
#Example 2. Function calling a parallel function
#loop
def g(_string):
time.sleep(1)
return _string + '*'
#outer function
def f(a,b,c,_list):
x = a+b+c
_list.append(str(x))
#loop parallelization
with Pool(4) as p:
new_list = p.map(g, _list)
return new_list
start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()
print(result)
print(f'done in {round(end-start, 3)} seconds')
Output:
['a*', 'b*', 'c*', '6*']
done in 1.048 seconds
Note that the "loop function" contains the logic to deal with a single element of the iterable. Pool.map() will take care of run it for all the elements.
The time.sleep(1)
calls are to simulate some time-consuming calculation. If the parallelization works, you should be able to process 4 inputs in 1 second rather than in 4 seconds.
Here is an example using your code:
def My_ParallelFunction(iterable_token, dataframe, label_array):
with mp.Pool(4) as p:
token_result = p.starmap(
_loop,
[(token, dataframe, label_array) for token in iterable_token]
)
return token_result
def _loop(token, dataframe, label_array):
A={}
B={}
C={}
D={}
token_count = {}
token_list = []
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return A,B,C,D,token_count,token_list
except Exception as e:
print(e)
Upvotes: 4
Reputation: 338
Something along these lines should work, if you wish to only multiprocess the for loop and not the entire function.
from multiprocessing import Pool
def My_ParallelFunction(iterable_token,dataframe,label_array):
def get_token_counts(token,dataframe,label_array):
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
return token,A_token,B_token,C_token,D_token
except Exception as e:
print(e)
pass
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
with Pool() as p:
p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
for res in p_results:
if res is None:
continue
token,A_token,B_token,C_token,D_token = res
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return (A,B,C,D,token_count,token_list)
I removed the part where you add elements to lists and dictionnaries from the worker function since you'd have to look into queues or shared objects in order to multip appending to a list or a dict. It's more work but should make your code run slightly faster (it all depends on how many elements you have in your iterable and what takes a lot of time to compute).
The idea behind this code is you create a worker function get_token_counts
that will run inside every thread provided it has a token
, a dataframe
and a label_array
. The returning part of the function contains all the elements needed to add your elements to a dictionnary (since you can't really know which thread finishes first, you return token
with it and it solves all your indexing problems. Although, maybe starmap keeps the order of the arguments, so maybe it's not necessary).
Once all elements have been computed, you proceed to adding them to your lists and dicts.
This is basically multiprocessing some of the dataframe functions along with cross_tab
, not exactly My_ParallelFunction
.
Since you gave no example I can't really test out the code and come up with something better.
Upvotes: 0