How does Parallel Processing vs. running several python-kernels work in Python?

I have a user-defined function (UDF) that takes a while to run - like a day or so. Additionally, that UDF needs to be applied repeatedly to different data sets. To be specific, my function opens a data set, edits it, then saves the data set. It doesn't return anything.

What are the best ways to speed it up? Would it be better if I open up like 3 Jupyter Notebooks and run 3 at a time - each on a different data set? Or would it be better to do parallel processing using the multiprocessing package?

I tried following the guide here: https://www.machinelearningplus.com/python/parallel-processing-python/ for parallel processing but it is not working. Not wanting to edit my original UDF, I basically just created another user-defined function called loop() like so:

def loop( i ):
     UDF( i, <...some other args that are not dynamic...> )

Then I tried to do an asynchronous parallel processing using a .map_async()-method like so. Let's say I have 10 sets of data that I need to go over and I stored the names of my dataset in a list called datalist:

import multiprocessing as mp

pool = mp.Pool( mp.cpu_count() )

pool.map_async( loop, [i for i in datalist] )

All I got in return after executing my code is:

<multiprocessing.pool.MapResult at 0x18cb2975160>

What am I doing wrong? And which way is better? I am really new at all this programming stuff. Sorry if my terminology is off as I am self teaching.

Upvotes: 1

Views: 943

Answers (1)

user3666197
user3666197

Reputation: 1

Intro :

No one can persuade you not to run multiple Jupyter-kernels, yet the (poor) performance of doing so might convince you not to extend this tactics for many-days long processing pipelines, except for some hobby projects.

Using Pools :

The most recent python has multiprocessing well tuned for doing the job at scale, yet some common sense is needed ( the number of CPU-cores need not be the best choice to spawn the "best-performing" setup, the same applies to RAM-sizes, CPU-cache re-use efficiencies, that impact any generic code execution, where impacts are easily having 3+(!)-orders of magnitude in runtimes ). A maxtasksperchild parameter may be quite important for stable long runs, due to better resources-management in remote-processes.

I would advocate not to complicate the story and protect your many-days processing (crashes are expensive) to use a context-wrapper "fusing" :

from multiprocessing import Pool, TimeoutError
pass;                import sys
pass;                import time
#############################################################################
#                       # The "right"-amount of CPU-cores depends (a LOT) on type of work
nPROCs = 4              # configurable, do not "over-subscribe" the CPU-cores

def UDF( ... ):                   # The Work takes place here :o)
    ...
    return                        # void / a PROCEDURE-alike UDF

def looper( p = ( -1, None ) ):   # the utility-LOOP-er
    UDF(    p[1], <...> )         # the Work, having static parameters
    return  p[0]                  # RETs a "self.Signature" to self-present


if __name__ == '__main__':
    #----------------------------------aPoolOfWorkPROCESSes----CONTEXT-MANAGER-fusing---
    with Pool( processes = nPROCs ) as aPoolOfWorkPROCESSes: # start nPROCs processes
        #-------------------------------------------------------------------------------<BEGIN>
        resultObj = aPoolOfWorkPROCESSes.map_async(              # method of scheduling
                                         UDF,                    # JOB: a Work to be done
                                         [ ( dataset.index( i ), #   signature "id"
                                                            i    #   dataset-part
                                             )          for i in     dataset
                                           ]                     # ITERABLE: defined.
                                         )
        #------------------------------------------------------------------------------- 
        # here, still inside the context-fusing, we have a "resultObj"-Object
        #                                       the work is being performed
        #                                       in "remote"-processes async to __main__
        # that means we can stay waiting for results and report progress by asking

        SECs  =  5
        eMOD1 = 60
        eMOD2 = 60
        eCNT  =  0
        rCNT  =  1
        MASKr = "\n( {2:} ):: INF[{1:>3d}.]: UDF( {0:>2d}, dataset[{0:>2d}] ) has completed\n"
        MASKb = "\n\nUSR: yes, Sir, going to break the loop, as you wished after {0:} UDF() calls reported to finish so far. The rest may still arrive from Pool()-instance's resultObj, using the .get()-method"
        MASK0 = "_"
        MASK1 = "."
        MASK2 = "{0:}:{1:}{2:}".format( ( eMOD1 - 1 ) * chr( 8 ),
                                        ( eMOD1 - 1 ) * MASK0,
                                        ( eMOD1 - 1 ) * chr( 8 )
                                        )
        MASK3 = "{0:}|{1:}{2:}".format( ( eMOD2 - 1
                                        + eMOD1 - 1 ) * chr( 8 ),
                                        ( eMOD1 - 1 ) * MASK0 + ( eMOD2 - 1 ) * chr( 32 ),
                                        ( eMOD2 - 1
                                        + eMOD1 - 1 ) * chr( 8 )
                                        )
        print "\n{0:} may use (Ctrl+C) to exit from this result-awaiting looper".format( time.ctime() )
        while True:
              try:
                   print( MASKr.format(
                                resultObj.get( timeout = SECs ), # either print()-s, as it gets "id", or throws EXC: after SECs-timeout
                                rCNT,                            #              rCNT     counter
                                time.ctime()                     #              .ctime() timestamp
                                )
                          )
                   rCNT += 1                                     # .INC returned results counter
                   continue                                      # LOOP-waiting  results from .get()

              except                TimeoutError: #----------------------------- EXC expected, a plain elapsed timeout event
                   ################ TimeoutError handler:
                   # we expect this TimeoutError to happen quite often, each SECs-timeout seconds
                   eCNT +=1       # TimeoutError counter .INC-ed
                   sys.stdout.write(                                          MASK1
                                     if ( 0 < eCNT %   eMOD1 )           else MASK2
                                     if ( 0 < eCNT % ( eMOD1 * eMOD2 ) ) else MASK3
                                     )
                   sys.stdout.flush()                            # enforce the terminal UI-update

              except KeyboardInterrupt: #--------------------------------------- EXC-type expected, user hits Ctrl+C
                   print( MASKb.format( rCNT-1 ) )               # we used Ctrl+C to stop the loop
                   break                                         # terminate the while()-loop

              except: #--------------------------------------------------------- EXC-type unexpected, re-raised from a "remote"-process call via a .get()-method above
                   print( "\nEXC from a 'remote'-process" )      # we might have got a re-thrown EXC from a "remote"-process, via .get()-method
                   continue                                      # LOOP-waiting  results from .get()

        print "\n{0:} used (Ctrl+C) to request an exit from result-awaiting looper - remaining results still may become available from 'remote'-process(es) via calls to the resultObj.get() method".format( time.ctime() )
        #-------------------------------------------------------------------------------<END>
    #----------------------------------aPoolOfWorkPROCESSes----CONTEXT-MANAGER-fusing---
    print( 60*"/\\" )

Upvotes: 1

Related Questions