Reputation: 1162
I have a large script where I am processing terabytes of weather/climate data that comes in gridded format. I have a script that uses an outer loop (over years - 1979 to 2024), and for each year, loops over each month (1 - 12), as data for each month comes in monthly files, and an additional loop that loops over each hour or station, as appropriate, within each monthly loop. These loops fill arrays that look like [hr, stn, pressure level]
, or [hr, stn]
, depending on the variable. This part works fine.
Once I have the data in the arrays, I am using multiprocessing pool.starmap
function to run, in parallel, metpy package calculations that only operate on 1D arrays. This is where something weird happens. The program now seems to go back to the outer loop for years, and begins to run "# begin by processing some files that come in yearly format" again.
Here is my code, which is generalized, as I have had a lot of trouble trying to reproduce this error with a much smaller example.
# loop over years
for yr in np.arange(46) + 1979:
# begin by processing some files that come in yearly format
# loop over months
for mo in range(12) + 1:
# open and put data from specific lat/lon points into arrays by variable
# loop over lat/lon locations ("stations")
for station in range(50):
for hr in range(2920):
# fill arrays
# go back to working within outer loop (years) for starting multiprocessing work on filled arrays
with Pool(processes = 16) as pool1:
tw_sfc_pooled = pool1.starmap(mpcalc.wet_bulb_temperature, tw_sfc_argument_list)
bulk_shear_1km_pooled = pool1.starmap(mpcalc.bulk_shear, bulk_shear_1km_argument_list)
many_more_pooled = poo1.starmap(mpcalc.func, many arg lists)
pool1.close() # am I closing this wrong?
pool1.join() # do I need this statement here?
# put pooled lists into final variable arrays for use in work
The code works fine until the multiprocessing ends. From there, it goes back to the outer loop, and begins to read in the next year's data in parallel, with many xarray and other errors (I'm using xarray to read netcdf files). So, the first year of data processing goes fine, but everything fails for the 2nd year.
My guess is that either multiprocessing isn't being started or stopped correctly in my code, or that it doesn't like being inside a loop, or maybe the problems arise from having 20+ pool.starmap
processes. I don't know.
Any thoughts on what the problem is?
Upvotes: 1
Views: 118
Reputation: 44283
I don't see anything in your code that should result in the problem you are encountering. I am posting this more as an explanation as to what your code is doing and what you might try to resolve the problem and increase performance.
One first creates a multiprocessing.pool.Pool
instance and then submits "tasks" to the pool using one of the methods that a pool offers for doing this, e.g. apply
, apply_async
, map
, map_async
, starmap
, starmap_async
etc. You are using starmap
, which in your case submits multiple tasks and blocks until all those tasks have completed. In general, at any point in time some or all of your pool's N processes may be working on a task while there are other tasks sitting on a "queue" that have been submitted but waiting for a pool process to become idle so it can take up the work of processing one of these awaiting tasks.
Since you are using a context manager with the pool (i.e. with ... as pool1:
), when that block exits it will call terminate
on the pool and any task that is currently being executed by a pool process will also be terminated and any task awaiting to be executed will be flushed and never executed. You also have coded a call to close
and join
following the calls to starmap
. close
prevents any new tasks from being submitted. This means that once the pool processes have completed processing all tasks that have already been submitted, the pool processes will terminate knowing that there will never be any more tasks submitted. The call to join
can then be used to wait for the pool processes to complete. Once the call to join
returns you know that all submitted tasks have been processed.
However, since you are using method starmap
, you know that it returns only when all tasks submitted with that call have completed. Calling close
and join
becomes unnecessary. But with those calls the implicit call to terminate
that happens when the with
block exits becomes superfluous as the pool is already being destroyed. (You could, if you want, call join
on the pool at this point if you want to block until the implicit call to terminate
has finished terminating all of the pool processes.) So typically one uses close
and join
when a context manager is not being used and/or you want to be sure that all tasks that have been submitted have completed. For example:
pool = Pool()
for i in range(10):
pool.apply_async(worker, args=(i,)) for i in range(10)
# Wait for all submitted tasks to complete and destroy the pool:
pool.close()
pool.join()
In your code there is no need to be repeatedly creating and destroying a multiprocessing pool. I would suggest that you try the following:
with Pool(processes = 16) as pool1:
# loop over years
for yr in range(1975, 2025):
# begin by processing some files that come in yearly format
# loop over months
for mo in range(1, 13):
# open and put data from specific lat/lon points into arrays by variable
# loop over lat/lon locations ("stations")
for station in range(50):
for hr in range(2920):
# fill arrays
...
# go back to working within outer loop (years) for starting multiprocessing work on filled arrays
tw_sfc_pooled = pool1.starmap(mpcalc.wet_bulb_temperature, tw_sfc_argument_list)
bulk_shear_1km_pooled = pool1.starmap(mpcalc.bulk_shear, bulk_shear_1km_argument_list)
many_more_pooled = poo1.starmap(mpcalc.func, many arg lists)
You could even use starmap_async
as follows (but it will probably not result in the work competing sooner given the number of tasks being submitted vs. the pool size):
with Pool(processes = 16) as pool1:
# loop over years
for yr in range(1975, 2025):
# begin by processing some files that come in yearly format
# loop over months
for mo in range(1, 13):
# open and put data from specific lat/lon points into arrays by variable
# loop over lat/lon locations ("stations")
for station in range(50):
for hr in range(2920):
# fill arrays
...
# go back to working within outer loop (years) for starting multiprocessing work on filled arrays
async_result_1 = pool1.starmap_async(mpcalc.wet_bulb_temperature, tw_sfc_argument_list)
async_result_2 = pool1.starmap_async(mpcalc.bulk_shear, bulk_shear_1km_argument_list)
async_result_3 = poo1.starmap_async(mpcalc.func, many arg lists)
tw_sfc_pooled = async_result_1.get()
bulk_shear_1km_pooled = async_result_2.get()
many_more_pooled = async_result_3.get()
Upvotes: 0
Reputation: 33
Edit: I should mention, I'm using pypy
.
Edit 2: This seems to be an open issue, both in cpython and in pypy
Funnily enough, I posted about the same problem at around the same time as you! This is my post. You can see in the description that I'm hinting at the same problem as you are, that perhaps the processes aren't being handled correctly.
I took a look in the documentation of multiprocessing
lib and saw the following, regarding the Pool
objects:
join()
Wait for the worker processes to exit. One must call close() or
terminate() before using join().
and also:
Changed in version 3.3: Pool objects now support the context management
protocol – see Context Manager Types. __enter__() returns the pool object, and
__exit__() calls terminate().
You and I both use the Pool object within a context management protocol using the with ... as
syntax. This means that, upon exiting the block, the pool should terminate.
I tried joining the threads after exiting the pool, as the first block of documentation suggested, but the problem persisted.
For me, the problem was fixed by manually calling gc.collect() after the block. So, manually calling the garbage collector may solve your issue, but documentation suggests that we shouldn't rely on this, if my understanding is correct:
Warning
multiprocessing.pool objects have internal resources that need to be properly
managed (like any other resource) by using the pool as a context manager or by
calling close() and terminate() manually. Failure to do this can lead to the
process hanging on finalization.
Note that it is not correct to rely on the garbage collector to destroy the
pool as CPython does not assure that the finalizer of the pool will be called
(see object.__del__() for more information).
This suggests that using the pool within a with
block is enough for it to be properly managed, but it seems not to be the case in our cases.
Try calling gc.collect()
after the with
block, maybe it will solve the problem in your case too.
Upvotes: 0