Reputation: 53
I'm a noob with multiprocessing, and I'm trying to speed up an old algorithm of mine. It works perfectly fine, without multipocessing, but in the moment I try to implement it, the program stop working: it stands by untill I abort the script. Another issue is that it doesn't populate the dataframe: again, normally it works, but with multiprocessing it returns only NaN.
func works well.
stockUniverse = list(map(lambda s: s.strip(), Stocks)) #Stocks = list
def func(i):
df.at[i, 'A'] = 1
df.at[i, 'B'] = 2
df.at[i, 'C'] = 3
print(i, 'downloaded')
return True
if __name__ == "__main__":
print('Start')
pool = mp.Pool(mp.cpu_count())
pool.imap(func, stockUniverse)
print(df)
the result is:
Index 19 NaN NaN NaN
index 20 NaN NaN NaN
And then it stops there until I hit Ctrl+C
.
Thanks
Upvotes: 1
Views: 877
Reputation: 44108
The map
function blocks until all the submitted tasks have completed and returns a list of the return values from the worker function. But the imap
function returns immediately with an iterator that must be iterated to return the return values one by one as each becomes available. Your original code did not iterate that iterator but instead immediately printed out what it expected was the updated df
. But you would not have given the tasks enough time to start and complete for df
to have been modified. In theory if you had inserted before the print
statement a call to time.sleep
for a sufficiently long enough time, then the tasks would have started and completed before you printed out df
. But clearly iterating the iterator is the most efficient way of being sure all tasks have completed and the only way of getting return values back.
But, as I mentioned in my comment, you have a much bigger problem. The tasks you submitted are executed by worker function func
being called by processes in the process pool that you created, which are each executing in their own address space. You did not tag your question with the platform on which you are running (whenever you tag a question with multiprocessing
, you are suppose to also tag the question with the platform), but I might infer that you are running under a platform that uses the spawn
method to create new processes, such as Windows, and that is why you have the if __name__ == "__main__":
block controlling code that creates new processes (i.e. the processing pool). When spawn
is used to create new processes, a new, empty address space is created, a new Python interpreter is launched and the source is re-executed from the top (without the if __name__ == "__main__":
block controlling code that creates new processes, you would get into an infinite, recursive loop creating new processes). But this means that any definition of df
at global scope made outside the if __name__ == "__main__":
block (which, you must have omitted if you are running under Windows) will be creating a new, separate instance for each process in the pool as each process is created.
If you are instead running under Linux
, where fork
is used to create new processes, the story is a bit different. The new processes will inherit the original address space from the main process and all declared variables, but copy on write is used. That means that once a subprocess attempts to modify any variable in this inherited storage, a copy of the page is made and the process will now be working on its own copy. So again, nothing can be shared for updating purposes.
You should therefore modify your program to have your worker function return values back to the main process, which will do the necessary updating:
import multiprocessing as mp
import pandas as pd
def func(stock):
return (stock, (('A', 1), ('B', 1), ('C', 1)))
if __name__ == "__main__":
stockUniverse = ['abc', 'def', 'ghi', 'klm']
d = {col: pd.Series(index=stockUniverse, dtype='int32') for col in ['A', 'B', 'C']}
df = pd.DataFrame(d)
pool_size = min(mp.cpu_count(), len(stockUniverse))
pool = mp.Pool(pool_size)
for result in pool.imap_unordered(func, stockUniverse):
stock, col_values = result # unpack
for col_value in col_values:
col, value = col_value # unpack
df.at[stock, col] = value
print(df)
Prints:
A B C
abc 1 1 1
def 1 1 1
ghi 1 1 1
klm 1 1 1
Note that I have used imap_unordered
instead of imap
. The former method is allowed to return the results in arbitrary order (i.e. as they become available) and is generally more efficient and since the return value contains all the information required for setting the correct row of df
, we no longer require any specific ordering.
But:
If your worker function is doing largely nothing but downloading from a website and very little CPU-intensive processing, then you could (should) be using a thread pool by making the simple substitution of:
from multiprocessing.pool import ThreadPool
...
MAX_THREADS_TO_USE = 100 # or maybe even larger!!!
pool_size = min(MAX_THREADS_TO_USE, len(stockUniverse))
pool = ThreadPool(pool_size)
And since all threads share the same address space, you could use your original worker function, func
as is!
Upvotes: 1