Reputation: 35
I want to run multiprocess in python. Here is an example:
def myFunction(name,age):
output = paste(name,age)
return output
names = ["A","B","C"]
ages = ["1","2","3"]
with mp.Pool(processes=no_cpus) as pool:
results = pool.starmap(myFunction,zip(names,ages))
results_table = pd.concat(results)
results_table.to_csv(file,sep="\t",index=False)
myFunction
in the real case takes really long time. Sometime I have to interupt the running and start again. However the results
will only be written to the output file when all pool.starmap
is done. How can I store the intermediate/cache result before it's finished?
I don't want to change myFunction from return
to .to_csv()
Thanks!
Upvotes: 0
Views: 906
Reputation: 44283
Instead of using map
, use method imap
, which returns an iterator that when iterated gives each result one by one as they become available (i.e. returned by my_function
). However, the results must still be returned in order. If you do not care about the order, than use imap_unordered
.
As each dataframe is returned and iterated, it is converted to a CSV file and outputted either with or without a header according to whether it is the first result being processed.
import pandas as pd
import multiprocessing as mp
def paste(name, age):
return pd.DataFrame([[name, age]], columns=['Name', 'Age'])
def myFunction(t):
name, age = t # unpack passed tuple
output = paste(name, age)
return output
# Required for Windows:
if __name__ == '__main__':
names = ["A","B","C"]
ages = ["1","2","3"]
no_cpus = min(len(names), mp.cpu_count())
csv_file = 'test.txt'
with mp.Pool(processes=no_cpus) as pool:
# Results from imap must be iterated
for index, result in enumerate(pool.imap(myFunction, zip(names,ages))):
if index == 0:
# First return value
header = True
open_flags = "w"
else:
header = False
open_flags = "a"
with open(csv_file, open_flags, newline='') as f:
result.to_csv(f, sep="\t", index=False, header=header)
Output of test.txt:
Name Age
A 1
B 2
C 3
Upvotes: 1