HFBrowning
HFBrowning

Reputation: 2336

Multiprocessing time increases linearly with more cores

I have an arcpy process that requires doing a union on a bunch of layers, running some calculations, and writing an HTML report. Given the number of reports I need to generate (~2,100) I need this process to be as quick as possible (my target is 2 seconds per report). I've tried a number of ways to do this, including multiprocessing, when I ran across a problem, namely, that running the multi-process part essentially takes the same amount of time no matter how many cores I use.

For instance, for the same number of reports:

and so on. It works out to the same total time because churning through twice as many at a time takes twice as long to do.

Does this mean my problem is I/O bound, rather than CPU bound? (And if so - what do I do about it?) I would have thought it was the latter, given that the large bottleneck in my timing is the union (it takes up about 50% of the processing time). Unions are often expensive in ArcGIS, so I assumed breaking it up and running 2 - 10 at once would have been 2 - 10 times faster. Or, potentially I implementing multi-process incorrectly?

## Worker function just included to give some context

def worker(sub_code):
    layer = 'in_memory/lyr_{}'.format(sub_code)
    arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code))
    arcpy.env.extent = layer
    union_name = 'in_memory/union_' + sub_code

    arcpy.Union_analysis([fields],
                     union_name,
                     "NO_FID", "1 FEET")
    #.......Some calculations using cursors

    # Templating using Jinjah
    context = {}
    context['DATE'] = now.strftime("%B %d, %Y")
    context['SUB_CD'] = sub_code
    context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))])
    # Etc

    # Then write the report out using custom function
    write_html('template.html', 'output_folder', context)


if __name__ == '__main__':
    subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])})
    NUM_CORES = 7
    chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
    for chunk in chunk_list:
        jobs = []
        for subbasin in chunk:
            p = multiprocessing.Process(target=worker, args=(subbasin,))
            jobs.append(p)
            p.start()

        for process in jobs:
            process.join()

Upvotes: 0

Views: 349

Answers (3)

aghast
aghast

Reputation: 15310

You don't show us quite enough to be sure what you are doing. For example, what is your env.workspace? And what is the value of subbasinFC? It seems like you're doing an analysis at the beginning of each process to filter down the data into layer. But is subbasinFC coming from disk, or from memory? If it's from disk, I'd suggest you read everything into memory before any of the processes try their filtering. That should speed things along, if you have the memory to support it. Otherwise, yeah, you're I/O bound on the input data.

Forgive my arcpy cluelessness, but why are you inserting a where clause in your sum of context['SUB_ACRES']? Didn't you already filter on sub_code at the start? (We don't know what the union is, so maybe you're unioning with something unfiltered...)

Upvotes: 1

Tim Peters
Tim Peters

Reputation: 70602

There isn't much to go on here, and I have no experience with ArcGIS. So I can just note two higher-level things. First, "the usual" way to approach this would be to replace all the code below your NUM_CORES = 7 with:

pool = multiprocessing.Pool(NUM_CORES)
pool.map(worker, subList)
pool.close()
pool.join()

map() takes care of keeping all the worker processes as busy as possible. As is, you fire up 7 processes, then wait for all of them to finish. All the processes that complete before the slowest vanish, and their cores sit idle waiting for the next outer loop iteration. A Pool keeps the 7 processes alive for the duration of the job, and feeds each a new piece of work to do as soon as it finishes its last piece of work.

Second, this part ends with a logical error:

chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]

You want NUM_CORES there rather than NUM_CORES-1. As-is, the first time around you extract

subList[0:7]

then

subList[6:13]

then

subList[12:19]

and so on. subList[6] and subList[12] (etc) are extracted twice each. The sublists overlap.

Upvotes: 3

tatlar
tatlar

Reputation: 3176

I'm not sure you are using the Process pool correctly to track your jobs. This:

for subbasin in chunk:
    p = multiprocessing.Process(target=worker, args=(subbasin,))
    jobs.append(p)
    p.start()

    for process in jobs:
        process.join()

Should instead be:

for subbasin in chunk:
    p = multiprocessing.Process(target=worker, args=(subbasin,))
    p.start()
    p.join()

Is there a specific reason you are going against the spec of using the multiprocessing library? You are not waiting until the thread terminates before spinning another process up, which is just going to create a whole bunch of processes that are not handled by the parent calling process.

Upvotes: 0

Related Questions