Sparke
Sparke

Reputation: 11

I am trying to use python multiprocessing to process some at the same time and not others then at the end print the total time taken

I am trying to run python 6 processes, 4 finish super quick, 2 take a bit longer. Asking now so it will not be a follow up question.

I am trying to run the 4 quick tasks one after the other and then run that batch alongside the 2 that take longer. All functions are quite unique and is not iterative.

The below works but will run on 6 cores, I need to consider that my users will only have 4 cores and need to not slow their machines to a crawl.

I hope the brackets explain what I would like to achieve p1+p2+(p3->p4->p5-p6)

At the very end I would like to print out completion, right now it prints out completion time as the process starts, once for each job, instead of waiting for the jobs to complete and printing once.

I have attempted async, before I even reached the point of splitting up the jobs I have run into the issue where it seems complete but does no work as at all (not processing the files at all), although it prints out the completion once at the end as I am looking for it

All the assist I find online shows iterative processes and treats all the jobs as equal, also I am quite fresh with python and likely could find my answer if I better new the correct words for my problem

    if __name__ == '__main__':
        p1 = Process(target=job1)
        p1.start()
        p2 = Process(target=job2)
        p2.start()
        p3 = Process(target=job3)
        p3.start()
        p4 = Process(target=job4)
        p4.start()
        p5 = Process(target=job5)
        p5.start()
        p6 = Process(target=job6)
        p6.start()
    
    print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
    
    
    if __name__ == '__main__':
        with Pool() as pool:
            p1 = pool.map_async(job1,range(1))
            p2 = pool.map_async(job2,range(1))
            p3 = pool.map_async(job3,range(1))
            p4 = pool.map_async(job4,range(1))
            p5 = pool.map_async(job5,range(1))
            p6 = pool.map_async(job6,range(1))
            print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

Process does the work but prints wrong and map_async doesnt do the work

Adding the full set of code, comments and desired results

    # import required modules
    import os
    import pandas as pd
    import warnings
    from datetime import date
    import time
    from multiprocessing import Process
    from multiprocessing.pool import Pool
    warnings.simplefilter("ignore")
    OverallStart  = time.process_time()
    
    
    # Get User Paths
    ORIGINALPATH = "c:/OriginalPATH"
    FINALPATH = "c:/FinalPATH/"
    CONVERTED = FINALPATH+"Converted/"
    PROCESSED = FINALPATH+"Processed/"
    
    #Filenames
    job1 = "Refused Details"
    job2 = "CSA Details"
    job3 = "CSAT Details"
    job4 = "RCR Details"
    job5 = "AHT Details"
    job6 = "Transferred Details"
    C = "Converted"
    D = "Processed"
    X = ".xlsx"
    V = ".csv"
    DT = date.today().strftime("_%d_%m_%Y")
    
    
    #Define Functions
    
    def fjob1(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job1+X):
            print ("No "+job1+" found or already processed")
        else:
            RFSDdf = pd.read_excel(ORIGINALPATH+job1+X,header=2,engine='openpyxl')
            RFSDdf.to_csv(CONVERTED+job1+"/"+job1+DT+V, index=False)
            os.rename(ORIGINALPATH+job1+X, PROCESSED+job1+"/"+job1+DT+X)
            print(job1+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob2(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job2+X):
            print ("No "+job2+" found or already processed")
        else:
            TRSFDdf = pd.read_excel(ORIGINALPATH+job2+X,header=2,engine='openpyxl')
            TRSFDdf.to_csv(CONVERTED+job2+"/"+job2+DT+V, index=False)
            os.rename(ORIGINALPATH+job2+X, PROCESSED+job2+"/"+job2+DT+X)
            print(job2+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob3(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job3+X):
            print ("No "+job3+" found or already processed")
        else:
            CSAdf = pd.read_excel(ORIGINALPATH+job3+X,header=2,engine='openpyxl')
            CSAdf.to_csv(CONVERTED+job3+"/"+job3+DT+V, index=False)
            os.rename(ORIGINALPATH+job3+X, PROCESSED+job3+"/"+job3+DT+X)
            print(job3+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob4(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job4+X):
            print ("No "+job4+" found or already processed")
        else:
            CSATdf = pd.read_excel(ORIGINALPATH+job4+X,header=2,engine='openpyxl')
            CSATdf.to_csv(CONVERTED+job4+"/"+job4+DT+V, index=False)
            os.rename(ORIGINALPATH+job4+X, PROCESSED+job4+"/"+job4+DT+X)
            print(job4+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob5(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job5+X):
            print ("No "+job5+" found or already processed")
        else:
            RCRdf = pd.read_excel(ORIGINALPATH+job5+X,header=2,engine='openpyxl')
            RCRdf.to_csv(CONVERTED+job5+"/"+job5+DT+V, index=False)
            os.rename(ORIGINALPATH+job5+X, PROCESSED+job5+"/"+job5+DT+X)
            print(job5+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob6(): 
        start = time.process_time()
        if not os.path.job6(ORIGINALPATH+job6+X):
            print ("No "+job6+" found or already processed")
        else:
            AHTdf = pd.read_excel(ORIGINALPATH+job6+X,header=2,engine='openpyxl')
            AHTdf.to_csv(CONVERTED+job6+"/"+job6+DT+V, index=False)
            os.rename(ORIGINALPATH+job6+X, PROCESSED+job6+"/"+job6+DT+X)
            print(job6+' Processed in '+str(round(time.process_time() - start,1))+' seconds')

Starting functions, method one

    if __name__ == '__main__':
        p1 = Process(target=fjob1)
        p1.start()
        p2 = Process(target=fjob2)
        p2.start()
        p3 = Process(target=fjob3)
        p3.start()
        p4 = Process(target=fjob4)
        p4.start()
        p5 = Process(target=fjob5)
        p5.start()
        p6 = Process(target=fjob6)
        p6.start()
    
    print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

Method 1 Output

All processed in 0.0 secondsenter code here All processed in 0.0 seconds All processed in 0.0 seconds All processed in 0.0 seconds All processed in 0.0 seconds All processed in 0.0 seconds All processed in 0.0 seconds job3 Processed in 11.0 seconds job4 Processed in 12.0 seconds job5 Processed in 14.0 seconds job6 Processed in 17.0 seconds job2 Processed in 41.0 seconds job1 Processed in 58.0 seconds

All jobs run concurrently and finish the time above after all my functions are called

Starting functions, method two

    if __name__ == '__main__':
        with Pool() as pool:
            p1 = pool.map_async(fjob1,range(1))
            p2 = pool.map_async(fjob2,range(1))
            p3 = pool.map_async(fjob3,range(1))
            p4 = pool.map_async(fjob4,range(1))
            p5 = pool.map_async(fjob5,range(1))
            p6 = pool.map_async(fjob6,range(1))
            print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

Method 2 output

All processed in 0.0 seconds

Job completes as it starts and no file conversions took place Desired output

job3 Processed in 11.0 seconds job4 Processed in 12.0 seconds job5 Processed in 14.0 seconds job2 Processed in 41.0 seconds job6 Processed in 17.0 seconds job1 Processed in 58.0 seconds All processed in 58.0 seconds

Mostly I want to measure the final run time after all the functions complete

Secondly, I want job1, job2 and job3 to start at the same time. job4 will wait till job3 completes, job5 will wait till job4 completes and job 6 will wait till job 6 completes

Upvotes: 0

Views: 93

Answers (1)

Booboo
Booboo

Reputation: 44283

I would create a multiprocessing pool of size 3. I would then submit the two slow-running tasks and wait for them to start by using a 'multiprocessing.Barrier' instance. The Barrier instance is initialized with parties=3. The two slow tasks and the main process then wait on the Barrier instance. The 3 processes will block until all of them have issued the wait and then the 3 processes are released. In this way the main process will not continue with submitting the slow-running tasks until it knows for certain that the 2 slow-running tasks are running in 2 of the pool's processes. I would then submit the 4 fast-running tasks, which would use the remaining pool process.

import time

def init_pool_processes(b):
    global barrier

    barrier = b

def slow_task(i):
    t = time.time()
    barrier.wait()
    print('slow task', i, 'started', flush=True)

    # Emulate doing something useful:
    s = 0
    for _ in range(200_000_000):
        s += 1

    print('slow task', i, 'ended.', 'running time =', time.time() - t, flush=True)

def fast_task(i):
    t = time.time()
    print('fast task', i, 'started', flush=True)

    s = 0
    for _ in range(30_000_000):
        s += 1

    print('fast task', i, 'ended.', 'running time =', time.time() - t, flush=True)

if __name__ == '__main__':
    from multiprocessing import Pool, Barrier

    barrier = Barrier(parties=3)

    pool = Pool(3, initializer=init_pool_processes, initargs=(barrier,))

    t = time.time()
    # Submit the slow tasks:
    for i in (1, 2):
        pool.apply_async(slow_task, args=(i,))

    # Wait for the slow tasks to start:
    barrier.wait()

    # Now submit the 4 fast tasks, which will use the remaining
    # 2 pool processes:
    for i in (1, 2, 3, 4):
        pool.apply_async(fast_task, args=(i,))

    # Wait for all tasks to complete:
    pool.close()
    pool.join()
    print('Total running time =', time.time() - t)

Prints:

slow task 2 started
slow task 1 started
fast task 1 started
fast task 1 ended. running time = 1.7409968376159668
fast task 2 started
fast task 2 ended. running time = 1.789001703262329
fast task 3 started
fast task 3 ended. running time = 1.90700101852417
fast task 4 started
fast task 4 ended. running time = 1.8340206146240234
slow task 1 ended. running time = 12.06902003288269
slow task 2 ended. running time = 12.413990020751953
Total running time = 12.558000326156616

Upvotes: 0

Related Questions