Reputation: 11
I am trying to run python 6 processes, 4 finish super quick, 2 take a bit longer. Asking now so it will not be a follow up question.
I am trying to run the 4 quick tasks one after the other and then run that batch alongside the 2 that take longer. All functions are quite unique and is not iterative.
The below works but will run on 6 cores, I need to consider that my users will only have 4 cores and need to not slow their machines to a crawl.
I hope the brackets explain what I would like to achieve p1+p2+(p3->p4->p5-p6)
At the very end I would like to print out completion, right now it prints out completion time as the process starts, once for each job, instead of waiting for the jobs to complete and printing once.
I have attempted async, before I even reached the point of splitting up the jobs I have run into the issue where it seems complete but does no work as at all (not processing the files at all), although it prints out the completion once at the end as I am looking for it
All the assist I find online shows iterative processes and treats all the jobs as equal, also I am quite fresh with python and likely could find my answer if I better new the correct words for my problem
if __name__ == '__main__':
p1 = Process(target=job1)
p1.start()
p2 = Process(target=job2)
p2.start()
p3 = Process(target=job3)
p3.start()
p4 = Process(target=job4)
p4.start()
p5 = Process(target=job5)
p5.start()
p6 = Process(target=job6)
p6.start()
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
if __name__ == '__main__':
with Pool() as pool:
p1 = pool.map_async(job1,range(1))
p2 = pool.map_async(job2,range(1))
p3 = pool.map_async(job3,range(1))
p4 = pool.map_async(job4,range(1))
p5 = pool.map_async(job5,range(1))
p6 = pool.map_async(job6,range(1))
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
Process does the work but prints wrong and map_async doesnt do the work
Adding the full set of code, comments and desired results
# import required modules
import os
import pandas as pd
import warnings
from datetime import date
import time
from multiprocessing import Process
from multiprocessing.pool import Pool
warnings.simplefilter("ignore")
OverallStart = time.process_time()
# Get User Paths
ORIGINALPATH = "c:/OriginalPATH"
FINALPATH = "c:/FinalPATH/"
CONVERTED = FINALPATH+"Converted/"
PROCESSED = FINALPATH+"Processed/"
#Filenames
job1 = "Refused Details"
job2 = "CSA Details"
job3 = "CSAT Details"
job4 = "RCR Details"
job5 = "AHT Details"
job6 = "Transferred Details"
C = "Converted"
D = "Processed"
X = ".xlsx"
V = ".csv"
DT = date.today().strftime("_%d_%m_%Y")
#Define Functions
def fjob1():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job1+X):
print ("No "+job1+" found or already processed")
else:
RFSDdf = pd.read_excel(ORIGINALPATH+job1+X,header=2,engine='openpyxl')
RFSDdf.to_csv(CONVERTED+job1+"/"+job1+DT+V, index=False)
os.rename(ORIGINALPATH+job1+X, PROCESSED+job1+"/"+job1+DT+X)
print(job1+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob2():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job2+X):
print ("No "+job2+" found or already processed")
else:
TRSFDdf = pd.read_excel(ORIGINALPATH+job2+X,header=2,engine='openpyxl')
TRSFDdf.to_csv(CONVERTED+job2+"/"+job2+DT+V, index=False)
os.rename(ORIGINALPATH+job2+X, PROCESSED+job2+"/"+job2+DT+X)
print(job2+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob3():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job3+X):
print ("No "+job3+" found or already processed")
else:
CSAdf = pd.read_excel(ORIGINALPATH+job3+X,header=2,engine='openpyxl')
CSAdf.to_csv(CONVERTED+job3+"/"+job3+DT+V, index=False)
os.rename(ORIGINALPATH+job3+X, PROCESSED+job3+"/"+job3+DT+X)
print(job3+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob4():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job4+X):
print ("No "+job4+" found or already processed")
else:
CSATdf = pd.read_excel(ORIGINALPATH+job4+X,header=2,engine='openpyxl')
CSATdf.to_csv(CONVERTED+job4+"/"+job4+DT+V, index=False)
os.rename(ORIGINALPATH+job4+X, PROCESSED+job4+"/"+job4+DT+X)
print(job4+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob5():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job5+X):
print ("No "+job5+" found or already processed")
else:
RCRdf = pd.read_excel(ORIGINALPATH+job5+X,header=2,engine='openpyxl')
RCRdf.to_csv(CONVERTED+job5+"/"+job5+DT+V, index=False)
os.rename(ORIGINALPATH+job5+X, PROCESSED+job5+"/"+job5+DT+X)
print(job5+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob6():
start = time.process_time()
if not os.path.job6(ORIGINALPATH+job6+X):
print ("No "+job6+" found or already processed")
else:
AHTdf = pd.read_excel(ORIGINALPATH+job6+X,header=2,engine='openpyxl')
AHTdf.to_csv(CONVERTED+job6+"/"+job6+DT+V, index=False)
os.rename(ORIGINALPATH+job6+X, PROCESSED+job6+"/"+job6+DT+X)
print(job6+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
Starting functions, method one
if __name__ == '__main__':
p1 = Process(target=fjob1)
p1.start()
p2 = Process(target=fjob2)
p2.start()
p3 = Process(target=fjob3)
p3.start()
p4 = Process(target=fjob4)
p4.start()
p5 = Process(target=fjob5)
p5.start()
p6 = Process(target=fjob6)
p6.start()
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
Method 1 Output
All processed in 0.0 secondsenter code here
All processed in 0.0 seconds
All processed in 0.0 seconds
All processed in 0.0 seconds
All processed in 0.0 seconds
All processed in 0.0 seconds
All processed in 0.0 seconds
job3 Processed in 11.0 seconds
job4 Processed in 12.0 seconds
job5 Processed in 14.0 seconds
job6 Processed in 17.0 seconds
job2 Processed in 41.0 seconds
job1 Processed in 58.0 seconds
All jobs run concurrently and finish the time above after all my functions are called
Starting functions, method two
if __name__ == '__main__':
with Pool() as pool:
p1 = pool.map_async(fjob1,range(1))
p2 = pool.map_async(fjob2,range(1))
p3 = pool.map_async(fjob3,range(1))
p4 = pool.map_async(fjob4,range(1))
p5 = pool.map_async(fjob5,range(1))
p6 = pool.map_async(fjob6,range(1))
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
Method 2 output
All processed in 0.0 seconds
Job completes as it starts and no file conversions took place Desired output
job3 Processed in 11.0 seconds job4 Processed in 12.0 seconds job5 Processed in 14.0 seconds job2 Processed in 41.0 seconds job6 Processed in 17.0 seconds job1 Processed in 58.0 seconds All processed in 58.0 seconds
Mostly I want to measure the final run time after all the functions complete
Secondly, I want job1, job2 and job3 to start at the same time. job4 will wait till job3 completes, job5 will wait till job4 completes and job 6 will wait till job 6 completes
Upvotes: 0
Views: 93
Reputation: 44283
I would create a multiprocessing pool of size 3. I would then submit the two slow-running tasks and wait for them to start by using a 'multiprocessing.Barrier' instance. The Barrier
instance is initialized with parties=3. The two slow tasks and the main process then wait
on the Barrier
instance. The 3 processes will block until all of them have issued the wait
and then the 3 processes are released. In this way the main process will not continue with submitting the slow-running tasks until it knows for certain that the 2 slow-running tasks are running in 2 of the pool's processes. I would then submit the 4 fast-running tasks, which would use the remaining pool process.
import time
def init_pool_processes(b):
global barrier
barrier = b
def slow_task(i):
t = time.time()
barrier.wait()
print('slow task', i, 'started', flush=True)
# Emulate doing something useful:
s = 0
for _ in range(200_000_000):
s += 1
print('slow task', i, 'ended.', 'running time =', time.time() - t, flush=True)
def fast_task(i):
t = time.time()
print('fast task', i, 'started', flush=True)
s = 0
for _ in range(30_000_000):
s += 1
print('fast task', i, 'ended.', 'running time =', time.time() - t, flush=True)
if __name__ == '__main__':
from multiprocessing import Pool, Barrier
barrier = Barrier(parties=3)
pool = Pool(3, initializer=init_pool_processes, initargs=(barrier,))
t = time.time()
# Submit the slow tasks:
for i in (1, 2):
pool.apply_async(slow_task, args=(i,))
# Wait for the slow tasks to start:
barrier.wait()
# Now submit the 4 fast tasks, which will use the remaining
# 2 pool processes:
for i in (1, 2, 3, 4):
pool.apply_async(fast_task, args=(i,))
# Wait for all tasks to complete:
pool.close()
pool.join()
print('Total running time =', time.time() - t)
Prints:
slow task 2 started
slow task 1 started
fast task 1 started
fast task 1 ended. running time = 1.7409968376159668
fast task 2 started
fast task 2 ended. running time = 1.789001703262329
fast task 3 started
fast task 3 ended. running time = 1.90700101852417
fast task 4 started
fast task 4 ended. running time = 1.8340206146240234
slow task 1 ended. running time = 12.06902003288269
slow task 2 ended. running time = 12.413990020751953
Total running time = 12.558000326156616
Upvotes: 0