Kevin
Kevin

Reputation: 2044

Multiprocessing Pool: Python

I want to run SAP reports in parallel using Python. I have figured out how to run everything in parallel when not sharing a pool but I cannot figure out how to create a pool to share resources.

Example:

Have 6 available sessions to run reports on. k = [1:6] but I have 8 reports to run. The first report to be given k=1, second report to be given k=2 and so on, but on the 7th report it needs to wait until one of the k's are available and then it runs on the first available k.

Below is my code:

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _zmrosales_ship_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

def _zmrosales_inv_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    Shipments = multiprocessing.Process(name='Shipments', target=_zmrosales_ship_month)
    Invoiced = multiprocessing.Process(name='Invoiced', target=_zmrosales_inv_month)

    Shipments.start()
    Invoiced.start()

Any help would be greatly appreciated!

I am using python 2.7

Updated Code based on Comments below (still not processing correctly, currently using the same i in the manager list for both functions. Need the first function to use i = 0 and the second to use i = 1. Then at the end of the function, to append the i back to the manager list)

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime
import contextlib

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _start_SAP():
#Code to start SAP

def _zmrosales_ship_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

def _zmrosales_inv_month(k):
S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
#SAP script
list.append(k)

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    multiprocessing.freeze.support()
    with Manager() as manager:
        list = manager.list(range(maxSess - 1))
        I = list.pop(0)
        with contextlib.closing(Pool(maxSess)) as pool:
            pool.apply(_start_SAP)
            pool.apply_async(_zmrosales_ship_month,[i])
            pool.apply_async(_zmrosales_inv_month, [i])
            pool.close()
            pool.join()

Edited for Final Answer

I was not able to use the code provided below to work for my situation but the logic and thought process made sense and it will probably help someone else so I marked it as correct.

I have found a solution to my issue however, the code is below. It is a different approach using queue versus manager and pool.

import multiprocessing
from multiprocessing import Manager, Process
import win32com.client
import os
from subprocess import call
import time
import datetime

def _start_SAP():

def test1(q, lock):

    print 'starting test1 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test1 shipments'
            time.sleep(15)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test1 report'


def test2(q, lock):
    print 'starting test2 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test2 shipments'
            time.sleep(30)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test2 report'


def test3(q, lock):
    print 'starting test3 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test3 shipments'
            time.sleep(20)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test3 report'


def test4(q, lock):
    print 'starting test4 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test4 shipments'
            time.sleep(45)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test4 report'


def test5(q, lock):
    print 'starting test5 report'

    while True:  # logic to get shared queue
        if not q.empty():
            lock.acquire()
            k = q.get()
            lock.release()
            break
        else:
            print 'waiting for available queue for test5 shipments'
            time.sleep(10)

    time.sleep(30)
    q.put(k)  # adding k back into the shared queue
    print 'finished test5 report'

def _end_SAP():


if __name__ == '__main__':

    lock = multiprocessing.Lock()  # creating a lock in multiprocessing

    shared_list = range(6)  # creating a shared list for all functions to use
    q = multiprocessing.Queue()  # creating an empty queue in mulitprocessing
    for n in shared_list:  # putting list into the queue
        q.put(n)
    print 'loaded queue to start the program'

    StartSAP = Process(target=_start_SAP)
    StartSAP.start()
    StartSAP.join()

    Test1 = Process(target=test1, args=(q, lock))
    Test2 = Process(target=test2, args=(q, lock))
    Test3 = Process(target=test3, args=(q, lock))
    Test4 = Process(target=test4, args=(q, lock))
    Test5 = Process(target=test5, args=(q, lock))

    Test1.start()
    Test2.start()
    Test3.start()
    Test4.start()
    Test5.start()
    Test1.join()
    Test2.join()
    Test3.join()
    Test4.join()
    Test5.join()

    EndSAP = Process(target=_close_SAP)
    EndSAP.start()
    EndSAP.join()

    while q.empty() is False:
        print(q.get())

Upvotes: 3

Views: 5517

Answers (1)

Yaroslav Surzhikov
Yaroslav Surzhikov

Reputation: 1608

You can adopt following pseudo-code to achieve desired result:

from multiprocessing.pool import  Pool
import multiprocessing

shared_list = multiprocessing.Manager().list()


def pool_function(i):
    shared_list.append([multiprocessing.current_process().name, i])


with Pool(6) as pool:
    for i in range(8):
        pool.apply(
            pool_function,
            args=(i, )
        )


print(shared_list)

Output:

[
    ['ForkPoolWorker-2', 0],
    ['ForkPoolWorker-5', 1],
    ['ForkPoolWorker-3', 2],
    ['ForkPoolWorker-4', 3],
    ['ForkPoolWorker-2', 4],
    ['ForkPoolWorker-6', 5],
    ['ForkPoolWorker-7', 6],
    ['ForkPoolWorker-5', 7]
]

Merged codes:

import win32com.client
import os
import multiprocessing
from multiprocessing import Pool
from subprocess import call
import time
import datetime

# Define shared resources using multiprocessing.Manager()

resource_manager = multiprocessing.Manager()

# FOLLOWING IS JUST FOR EXAMPLE PURPOSES
shared_list = resource_manager.list()
shared_dict = resource_manager.dict()

maxSess = 6  # max number of sessions allowed by license
filePath = os.path.join(os.getcwd(), 'sap_files')

def _zmrosales_ship_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

def _zmrosales_inv_month():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    S = SAP.FindById("ses[" + str(k) + "]")
    #Code to run SAP script

#### N times more def's with SAP scripts ####

if __name__ == '__main__':
    with Pool(maxSess) as pool:
        pool.apply_async(
            _zmrosales_ship_month
        )
        pool.apply_async(
            _zmrosales_inv_month
        )
        pool.close()
        pool.join()

If you need functions to execute sequentialy - replace apply_async with apply or add .get() to each call ( like pool.apply_async(f).get() )

For more information about shared resourses in multiprocessing - see managers reference

FINAL ANSWER:

import contextlib
import datetime
import multiprocessing
import os
import time
from multiprocessing import Pool
from subprocess import call

import win32com.client

maxSess = 6  # max number of sessions allowed by license
num_reports = 8
filePath = os.path.join(os.getcwd(), 'sap_files')


def _start_SAP():
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_start_SAP', k)
    while True:
        pass


def _zmrosales_ship_month(i):
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_zmrosales_ship_month', k, i)
    time.sleep(1)
    # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
    # SAP script


def _zmrosales_inv_month(i):
    k = shared_dict[multiprocessing.current_process().name]
    print(multiprocessing.current_process().name, '_zmrosales_inv_month', k, i)
    time.sleep(1)

    # S = SAP.FindById("ses[" + str(k) + "]") ## I need k to share the pool resource
    # SAP script


# N times more def's with SAP scripts ####

def worker_init(shared_dict, shared_list):
    """
    Map processes names to k
    :param shared_dict: multiprocessing.Manager().dict()
    :param shared_list: multiprocessing.Manager().list()
    """
    shared_dict[multiprocessing.current_process().name] = shared_list.pop(0)


if __name__ == '__main__':

    multiprocessing.freeze.support()

    with multiprocessing.Manager() as manager:
        shared_list = manager.list(range(maxSess))
        shared_dict = manager.dict()

        p = Pool(
            maxSess,  # count of workers
            initializer=worker_init,  # each worker will call this on spawn
            initargs=(shared_dict, shared_list,)  # arguments for initializer
        )

        with contextlib.closing(p) as pool:
            pool.apply_async(_start_SAP)

            for i in range(num_reports):
                pool.apply_async(_zmrosales_ship_month, args=(i,))
                pool.apply_async(_zmrosales_inv_month, args=(i,))

        p.close()
        p.join()

OTUPUT:

ForkPoolWorker-2 _start_SAP 0
ForkPoolWorker-3 _zmrosales_ship_month 1 0
ForkPoolWorker-4 _zmrosales_inv_month 3 0
ForkPoolWorker-7 _zmrosales_ship_month 2 1
ForkPoolWorker-5 _zmrosales_inv_month 4 1
ForkPoolWorker-6 _zmrosales_ship_month 5 2
ForkPoolWorker-3 _zmrosales_inv_month 1 2
ForkPoolWorker-4 _zmrosales_ship_month 3 3
ForkPoolWorker-7 _zmrosales_inv_month 2 3
ForkPoolWorker-5 _zmrosales_ship_month 4 4
ForkPoolWorker-6 _zmrosales_inv_month 5 4
ForkPoolWorker-3 _zmrosales_ship_month 1 5
ForkPoolWorker-4 _zmrosales_inv_month 3 5
ForkPoolWorker-7 _zmrosales_ship_month 2 6
ForkPoolWorker-5 _zmrosales_inv_month 4 6
ForkPoolWorker-6 _zmrosales_ship_month 5 7
ForkPoolWorker-3 _zmrosales_inv_month 1 7

Upvotes: 1

Related Questions