Kevin
Kevin

Reputation: 447

Multiprocessing subject to timer

I have a large list L to operate over. Let f() be the function which operates on L. f() takes another variable, which expires every 15 minutes and needs to be renewed. Here is an example, in serial:

def main():
    L = openList()
    # START THE CLOCK
    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()
    for item in L:
        f(item, a)   # operate on item given a
        # CHECK TIME REMAINING
        clockCur = dt.datetime.now()
        clockRem = (clockExp - clockCur).total_seconds()
        # RENEW a IF NEEDED
        if clockRem < 5: # renew with 5 seconds left
            clockStart = dt.datetime.now()
            clockExp = clockStart + dt.timedelta(seconds=900)
            a = getRenewed()

Since f() takes a few seconds (or longer sometimes), I would like to parallelize the code. Any tips for how to do that given the timer? I envision sharing clockExp and "a", and when a process satisfies clockRem < 5, it calls getRenewed() and shares the new "a" and clockExp, and repeat.

Upvotes: 1

Views: 1683

Answers (1)

Blckknght
Blckknght

Reputation: 104712

If getRenewed is idempotent (that is, you can call it multiple times without side effects), you can simply move your existing timer code to your worker processes, and let them each call it once when they notice their own timer has run down. This only requires synchronization for the items from the list that you pass in, and multiprocessing.Pool can handle that easily enough:

def setup_worker():
    global clockExp, a

    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)
    a = getRenewed()

def worker(item):
    global clockExp, a

    clockCur = dt.datetime.now()
    clockRem = (clockExp - clockCur).total_seconds()

    if clockRem < 5: # renew with 5 seconds left
        clockStart = dt.datetime.now()
        clockExp = clockStart + dt.timedelta(seconds=900)
        a = getRenewed()

    f(item, a)

def main(L):
    pool = multiprocessing.Pool(initializer=setup_worker)

    pool.map(worker, L)

If getRenewed is not idempotent, things will need to be a little more complicated. You won't be able to call it in each worker process, so you'll need to have a set up some kind of communication method between your processes so they can each get the latest version when it is available.

I'd suggest using a multiprocessing.queue to pass the a value from the main process to the workers. You can still use a Pool for the list items, you just need to make sure you use it asynchronously from the main process. Like this, perhaps:

def setup_worker2(queue):
    global x
    x = random.random()
    global a_queue, a, clockExp

    a_queue = queue
    a = a_queue.get()    # wait for the first `a` value
    clockStart = dt.datetime.now()
    clockExp = clockStart + dt.timedelta(seconds=900)

def worker2(item):
    global a, clockExp

    clockCur = dt.datetime.now()
    clockRem = (clockExp - clockCur).total_seconds()
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
        try:
            a = a_queue.get_nowait()
            clockStart = dt.datetime.now()
            clockExp = clockStart + dt.timedelta(seconds=900)
        except queue.Empty:
            pass

    return f(item, a)

def main2(L):
    queue = multiprocessing.Queue()     # setup the queue for the a values

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously

    while True:                   # loop for sending a values through the queue
        a = getRenewed()          # get a new item
        for _ in range(os.cpu_count()):
            queue.put(a)          # send one copy per worker process

        try:
            result.wait(900-5)    # sleep for ~15 minutes, or until the result is ready
        except multiprocessing.TimeoutError:
            pass                  # if we got a timeout, keep looping!
        else:
            break                 # if not, we are done, so break out of the loop!

The workers still need to have to have some timing code, because otherwise you'd face a race condition where one worker might consume two of the a values sent down the queue in a single batch from the main process. That could happen if some of the calls to f are significantly slower than others (which is probably likely if they involve downloading things from the web).

Upvotes: 3

Related Questions