Reputation: 2044
I want to run SAP reports in parallel using Python. I have figured out how to run everything in parallel when not sharing a pool but I cannot figure out how to create a pool to share resources.
Example:
Have 6 available sessions to run reports on. k = [1:6] but I have 8 reports to run. The first report to be given k=1, second report to be given k=2 and so on, but on the 7th report it needs to wait until one of the k's are available and then it runs on the first available k.
Below is my code:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)
Shipments.start()
Invoiced.start()
Any help would be greatly appreciated!
I am using python 2.7
Updated Code based on Comments below (still not processing correctly, currently using the same i in the manager list for both functions. Need the first function to use i = 0 and the second to use i = 1. Then at the end of the function, to append the i back to the manager list)
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
#Code to start SAP
def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
multiprocessing.freeze.support()
with Manager() as manager:
list = manager.list(range(maxSess - 1))
I = list.pop(0)
with contextlib.closing(Pool(maxSess)) as pool:
pool.apply(_start_SAP)
pool.apply_async(_zmrosales_ship_month,[i])
pool.apply_async(_zmrosales_inv_month, [i])
pool.close()
pool.join()
Edited for Final Answer
I was not able to use the code provided below to work for my situation but the logic and thought process made sense and it will probably help someone else so I marked it as correct.
I have found a solution to my issue however, the code is below. It is a different approach using queue versus manager and pool.
import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime
def _start_SAP():
def test1(q, lock):
print 'starting test1 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test1 shipments'
time.sleep(15)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test1 report'
def test2(q, lock):
print 'starting test2 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test2 shipments'
time.sleep(30)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test2 report'
def test3(q, lock):
print 'starting test3 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test3 shipments'
time.sleep(20)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test3 report'
def test4(q, lock):
print 'starting test4 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test4 shipments'
time.sleep(45)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test4 report'
def test5(q, lock):
print 'starting test5 report'
while True: # logic to get shared queue
if not q.empty():
lock.acquire()
k = q.get()
lock.release()
break
else:
print 'waiting for available queue for test5 shipments'
time.sleep(10)
time.sleep(30)
q.put(k) # adding k back into the shared queue
print 'finished test5 report'
def _end_SAP():
if __name__ == '__main__':
lock = multiprocessing.Lock() # creating a lock in multiprocessing
shared_list = range(6) # creating a shared list for all functions to use
q = multiprocessing.Queue() # creating an empty queue in mulitprocessing
for n in shared_list: # putting list into the queue
q.put(n)
print 'loaded queue to start the program'
StartSAP = Process(target=_start_SAP)
StartSAP.start()
StartSAP.join()
Test1 = Process(target=test1, args=(q, lock))
Test2 = Process(target=test2, args=(q, lock))
Test3 = Process(target=test3, args=(q, lock))
Test4 = Process(target=test4, args=(q, lock))
Test5 = Process(target=test5, args=(q, lock))
Test1.start()
Test2.start()
Test3.start()
Test4.start()
Test5.start()
Test1.join()
Test2.join()
Test3.join()
Test4.join()
Test5.join()
EndSAP = Process(target=_close_SAP)
EndSAP.start()
EndSAP.join()
while q.empty() is False:
print(q.get())
Upvotes: 3
Views: 5517
Reputation: 1608
You can adopt following pseudo-code to achieve desired result:
from multiprocessing.pool import Pool
import multiprocessing
shared_list = multiprocessing.Manager().list()
def pool_function(i):
shared_list.append([multiprocessing.current_process().name, i])
with Pool(6) as pool:
for i in range(8):
pool.apply(
pool_function,
args=(i, )
)
print(shared_list)
Output:
[
['ForkPoolWorker-2', 0],
['ForkPoolWorker-5', 1],
['ForkPoolWorker-3', 2],
['ForkPoolWorker-4', 3],
['ForkPoolWorker-2', 4],
['ForkPoolWorker-6', 5],
['ForkPoolWorker-7', 6],
['ForkPoolWorker-5', 7]
]
Merged codes:
import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
# Define shared resources using multiprocessing.Manager()
resource_manager = multiprocessing.Manager()
# FOLLOWING IS JUST FOR EXAMPLE PURPOSES
shared_list = resource_manager.list()
shared_dict = resource_manager.dict()
maxSess = 6 # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')
def _zmrosales_ship_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
def _zmrosales_inv_month():
name = multiprocessing.current_process().name
print name, 'Starting'
S = SAP.FindById("ses[" + str(k) + "]")
#Code to run SAP script
#### N times more def's with SAP scripts ####
if __name__ == '__main__':
with Pool(maxSess) as pool:
pool.apply_async(
_zmrosales_ship_month
)
pool.apply_async(
_zmrosales_inv_month
)
pool.close()
pool.join()
If you need functions to execute sequentialy - replace apply_async
with apply
or add .get()
to each call ( like pool.apply_async(f).get()
)
For more information about shared resourses in multiprocessing - see managers reference
FINAL ANSWER:
import contextlib
import datetime
import multiprocessing
import os
import time
from multiprocessing import Pool
from subprocess import call
import win32com.client
maxSess = 6 # max number of sessions allowed by license
num_reports = 8
filePath = os.path.join(os.getcwd(), 'sap_files')
def _start_SAP():
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_start_SAP', k)
while True:
pass
def _zmrosales_ship_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
def _zmrosales_inv_month(i):
k = shared_dict[multiprocessing.current_process().name]
print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
time.sleep(1)
# S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
# SAP script
# N times more def's with SAP scripts ####
def worker_init(shared_dict, shared_list):
"""
Map processes names to k
:param shared_dict: multiprocessing.Manager().dict()
:param shared_list: multiprocessing.Manager().list()
"""
shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)
if __name__ == '__main__':
multiprocessing.freeze.support()
with multiprocessing.Manager() as manager:
shared_list = manager.list(range(maxSess))
shared_dict = manager.dict()
p = Pool(
maxSess, # count of workers
initializer=worker_init, # each worker will call this on spawn
initargs=(shared_dict, shared_list,) # arguments for initializer
)
with contextlib.closing(p) as pool:
pool.apply_async(_start_SAP)
for i in range(num_reports):
pool.apply_async(_zmrosales_ship_month, args=(i,))
pool.apply_async(_zmrosales_inv_month, args=(i,))
p.close()
p.join()
OTUPUT:
ForkPoolWorker-2 _start_SAP 0
ForkPoolWorker-3 _zmrosales_ship_month 1 0
ForkPoolWorker-4 _zmrosales_inv_month 3 0
ForkPoolWorker-7 _zmrosales_ship_month 2 1
ForkPoolWorker-5 _zmrosales_inv_month 4 1
ForkPoolWorker-6 _zmrosales_ship_month 5 2
ForkPoolWorker-3 _zmrosales_inv_month 1 2
ForkPoolWorker-4 _zmrosales_ship_month 3 3
ForkPoolWorker-7 _zmrosales_inv_month 2 3
ForkPoolWorker-5 _zmrosales_ship_month 4 4
ForkPoolWorker-6 _zmrosales_inv_month 5 4
ForkPoolWorker-3 _zmrosales_ship_month 1 5
ForkPoolWorker-4 _zmrosales_inv_month 3 5
ForkPoolWorker-7 _zmrosales_ship_month 2 6
ForkPoolWorker-5 _zmrosales_inv_month 4 6
ForkPoolWorker-6 _zmrosales_ship_month 5 7
ForkPoolWorker-3 _zmrosales_inv_month 1 7
Upvotes: 1