Reputation: 541
I have a user-defined function (UDF) that takes a while to run - like a day or so. Additionally, that UDF needs to be applied repeatedly to different data sets. To be specific, my function opens a data set, edits it, then saves the data set. It doesn't return anything.
What are the best ways to speed it up? Would it be better if I open up like 3 Jupyter Notebooks and run 3 at a time - each on a different data set? Or would it be better to do parallel processing using the multiprocessing
package?
I tried following the guide here: https://www.machinelearningplus.com/python/parallel-processing-python/ for parallel processing but it is not working. Not wanting to edit my original UDF, I basically just created another user-defined function called loop()
like so:
def loop( i ):
UDF( i, <...some other args that are not dynamic...> )
Then I tried to do an asynchronous parallel processing using a .map_async()
-method like so. Let's say I have 10 sets of data that I need to go over and I stored the names of my dataset in a list called datalist
:
import multiprocessing as mp
pool = mp.Pool( mp.cpu_count() )
pool.map_async( loop, [i for i in datalist] )
All I got in return after executing my code is:
<multiprocessing.pool.MapResult at 0x18cb2975160>
What am I doing wrong? And which way is better? I am really new at all this programming stuff. Sorry if my terminology is off as I am self teaching.
Upvotes: 1
Views: 943
Reputation: 1
No one can persuade you not to run multiple Jupyter-kernels, yet the (poor) performance of doing so might convince you not to extend this tactics for many-days long processing pipelines, except for some hobby projects.
The most recent python has multiprocessing
well tuned for doing the job at scale, yet some common sense is needed ( the number of CPU-cores need not be the best choice to spawn the "best-performing" setup, the same applies to RAM-sizes, CPU-cache re-use efficiencies, that impact any generic code execution, where impacts are easily having 3+(!)
-orders of magnitude in runtimes ). A maxtasksperchild
parameter may be quite important for stable long runs, due to better resources-management in remote-processes.
I would advocate not to complicate the story and protect your many-days processing (crashes are expensive) to use a context-wrapper "fusing" :
from multiprocessing import Pool, TimeoutError
pass; import sys
pass; import time
#############################################################################
# # The "right"-amount of CPU-cores depends (a LOT) on type of work
nPROCs = 4 # configurable, do not "over-subscribe" the CPU-cores
def UDF( ... ): # The Work takes place here :o)
...
return # void / a PROCEDURE-alike UDF
def looper( p = ( -1, None ) ): # the utility-LOOP-er
UDF( p[1], <...> ) # the Work, having static parameters
return p[0] # RETs a "self.Signature" to self-present
if __name__ == '__main__':
#----------------------------------aPoolOfWorkPROCESSes----CONTEXT-MANAGER-fusing---
with Pool( processes = nPROCs ) as aPoolOfWorkPROCESSes: # start nPROCs processes
#-------------------------------------------------------------------------------<BEGIN>
resultObj = aPoolOfWorkPROCESSes.map_async( # method of scheduling
UDF, # JOB: a Work to be done
[ ( dataset.index( i ), # signature "id"
i # dataset-part
) for i in dataset
] # ITERABLE: defined.
)
#-------------------------------------------------------------------------------
# here, still inside the context-fusing, we have a "resultObj"-Object
# the work is being performed
# in "remote"-processes async to __main__
# that means we can stay waiting for results and report progress by asking
SECs = 5
eMOD1 = 60
eMOD2 = 60
eCNT = 0
rCNT = 1
MASKr = "\n( {2:} ):: INF[{1:>3d}.]: UDF( {0:>2d}, dataset[{0:>2d}] ) has completed\n"
MASKb = "\n\nUSR: yes, Sir, going to break the loop, as you wished after {0:} UDF() calls reported to finish so far. The rest may still arrive from Pool()-instance's resultObj, using the .get()-method"
MASK0 = "_"
MASK1 = "."
MASK2 = "{0:}:{1:}{2:}".format( ( eMOD1 - 1 ) * chr( 8 ),
( eMOD1 - 1 ) * MASK0,
( eMOD1 - 1 ) * chr( 8 )
)
MASK3 = "{0:}|{1:}{2:}".format( ( eMOD2 - 1
+ eMOD1 - 1 ) * chr( 8 ),
( eMOD1 - 1 ) * MASK0 + ( eMOD2 - 1 ) * chr( 32 ),
( eMOD2 - 1
+ eMOD1 - 1 ) * chr( 8 )
)
print "\n{0:} may use (Ctrl+C) to exit from this result-awaiting looper".format( time.ctime() )
while True:
try:
print( MASKr.format(
resultObj.get( timeout = SECs ), # either print()-s, as it gets "id", or throws EXC: after SECs-timeout
rCNT, # rCNT counter
time.ctime() # .ctime() timestamp
)
)
rCNT += 1 # .INC returned results counter
continue # LOOP-waiting results from .get()
except TimeoutError: #----------------------------- EXC expected, a plain elapsed timeout event
################ TimeoutError handler:
# we expect this TimeoutError to happen quite often, each SECs-timeout seconds
eCNT +=1 # TimeoutError counter .INC-ed
sys.stdout.write( MASK1
if ( 0 < eCNT % eMOD1 ) else MASK2
if ( 0 < eCNT % ( eMOD1 * eMOD2 ) ) else MASK3
)
sys.stdout.flush() # enforce the terminal UI-update
except KeyboardInterrupt: #--------------------------------------- EXC-type expected, user hits Ctrl+C
print( MASKb.format( rCNT-1 ) ) # we used Ctrl+C to stop the loop
break # terminate the while()-loop
except: #--------------------------------------------------------- EXC-type unexpected, re-raised from a "remote"-process call via a .get()-method above
print( "\nEXC from a 'remote'-process" ) # we might have got a re-thrown EXC from a "remote"-process, via .get()-method
continue # LOOP-waiting results from .get()
print "\n{0:} used (Ctrl+C) to request an exit from result-awaiting looper - remaining results still may become available from 'remote'-process(es) via calls to the resultObj.get() method".format( time.ctime() )
#-------------------------------------------------------------------------------<END>
#----------------------------------aPoolOfWorkPROCESSes----CONTEXT-MANAGER-fusing---
print( 60*"/\\" )
Upvotes: 1