Reputation: 447
I have a large list L to operate over. Let f() be the function which operates on L. f() takes another variable, which expires every 15 minutes and needs to be renewed. Here is an example, in serial:
def main():
L = openList()
# START THE CLOCK
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
for item in L:
f(item, a) # operate on item given a
# CHECK TIME REMAINING
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
# RENEW a IF NEEDED
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
Since f() takes a few seconds (or longer sometimes), I would like to parallelize the code. Any tips for how to do that given the timer? I envision sharing clockExp and "a", and when a process satisfies clockRem < 5, it calls getRenewed() and shares the new "a" and clockExp, and repeat.
Upvotes: 1
Views: 1683
Reputation: 104712
If getRenewed
is idempotent (that is, you can call it multiple times without side effects), you can simply move your existing timer code to your worker processes, and let them each call it once when they notice their own timer has run down. This only requires synchronization for the items from the list that you pass in, and multiprocessing.Pool
can handle that easily enough:
def setup_worker():
global clockExp, a
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
def worker(item):
global clockExp, a
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
f(item, a)
def main(L):
pool = multiprocessing.Pool(initializer=setup_worker)
pool.map(worker, L)
If getRenewed
is not idempotent, things will need to be a little more complicated. You won't be able to call it in each worker process, so you'll need to have a set up some kind of communication method between your processes so they can each get the latest version when it is available.
I'd suggest using a multiprocessing.queue
to pass the a
value from the main process to the workers. You can still use a Pool
for the list items, you just need to make sure you use it asynchronously from the main process. Like this, perhaps:
def setup_worker2(queue):
global x
x = random.random()
global a_queue, a, clockExp
a_queue = queue
a = a_queue.get() # wait for the first `a` value
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
def worker2(item):
global a, clockExp
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
try:
a = a_queue.get_nowait()
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
except queue.Empty:
pass
return f(item, a)
def main2(L):
queue = multiprocessing.Queue() # setup the queue for the a values
pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))
result = pool.map_async(worker2, L) # send the items to the pool asynchronously
while True: # loop for sending a values through the queue
a = getRenewed() # get a new item
for _ in range(os.cpu_count()):
queue.put(a) # send one copy per worker process
try:
result.wait(900-5) # sleep for ~15 minutes, or until the result is ready
except multiprocessing.TimeoutError:
pass # if we got a timeout, keep looping!
else:
break # if not, we are done, so break out of the loop!
The workers still need to have to have some timing code, because otherwise you'd face a race condition where one worker might consume two of the a
values sent down the queue in a single batch from the main process. That could happen if some of the calls to f
are significantly slower than others (which is probably likely if they involve downloading things from the web).
Upvotes: 3