whacke
whacke

Reputation: 63

Python multitprocessing to process files

I've never done anything with multiprocessing before, but I recently ran into a problem with one of my projects taking an excessive amount of time to run. I have about 336,000 files I need to process, and a traditional for loop would likely take about a week to run.

There are two loops to do this, but they are effectively identical in what they return so I've only included one.

import json
import os
from tqdm import tqdm
import multiprocessing as mp

jsons = os.listdir('/content/drive/My Drive/mrp_workflow/JSONs')

materials = [None] * len(jsons)

def asyncJSONs(file, index):
  try:
    with open('/content/drive/My Drive/mrp_workflow/JSONs/{}'.format(file)) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = file.split('.')[0]
    materials[index] = properties
  except:
    print("Error parsing at {}".format(file))

process_list = []
i = 0
for file in tqdm(jsons):
    p = mp.Process(target=asyncJSONs,args=(file,i))
    p.start()
    process_list.append(p)
    i += 1

for process in process_list:
  process.join()

Everything in that relating to multiprocessing was cobbled together from a collection of google searches and articles, so I wouldn't be surprised if it wasn't remotely correct. For example, the 'i' variable is a dirty attempt to keep the information in some kind of order.

What I'm trying to do is load information from those JSON files and store it in the materials variable. But when I run my current code nothing is stored in materials.

Upvotes: 1

Views: 149

Answers (3)

furas
furas

Reputation: 142631

As you can read in other answers - processes don't share memory and you can't set value directly in materials. Function has to use return to send result back to main process and it has to wait for result and get it.

It can be simpler with Pool. It doesn't need to use queue manually. And it should return results in the same order as data in all_jsons. And you can set how many processes to run at the same time so it will not block CPU for other processes in system.

But it can't use tqdm.

I couldn't test it but it can be something like this

import os
import json
from multiprocessing import Pool

# --- functions ---

def asyncJSONs(filename):
  try:
    fullpath = os.path.join(folder, filename)
    with open(fullpath) as f:
        data = json.loads(f.read())
    properties = process_dict(data, {})
    properties['name'] = filename.split('.')[0]
    return properties
  except:
    print("Error parsing at {}".format(filename))

# --- main ---

# for all processes (on some systems it may have to be outside `__main__`)
folder = '/content/drive/My Drive/mrp_workflow/JSONs'

if __name__ == '__main__':
    # code only for main process
    
    all_jsons = os.listdir(folder)

    with Pool(5) as p:
        materials = p.map(asyncJSONs, all_jsons)

    for item in materials:
        print(item)    

BTW:

Other modules: concurrent.futures, joblib, ray,

Upvotes: 3

Tim Roberts
Tim Roberts

Reputation: 54635

You need to understand how multiprocessing works. It starts a brand new process for EACH task, each with a brand new Python interpreter, which runs your script all over again. These processes do not share memory in any way. The other processes get a COPY of your globals, but they obviously can't be the same memory.

If you need to send information back, you can using a multiprocessing.queue. Have the function stuff the results in a queue, while your main code waits for stuff to magically appear in the queue.

Also PLEASE read the instructions in the multiprocessing docs about main. Each new process will re-execute all the code in your main file. Thus, any one-time stuff absolutely must be contained in a

if __name__ == "__main__":

block. This is one case where the practice of putting your mainline code into a function called main() is a "best practice".


What is taking all the time here? Is it reading the files? If so, then you might be able to do this with multithreading instead of multiprocessing. However, if you are limited by disk speed, then no amount of multiprocessing is going to reduce your run time.

Upvotes: 0

DeusXMachina
DeusXMachina

Reputation: 1399

Going to mention a totally different way of solving this problem. Don't bother trying to append all the data to the same list. Extract the data you need, and append it to some target file in ndjson/jsonlines format. That's just where, instead of objects part of a json array [{},{}...], you have separate objects on each line.

{"foo": "bar"} 
{"foo": "spam"} 
{"eggs": "jam"} 

The workflow looks like this:

  1. spawn N workers with a manifest of files to process and the output file to write to. You don't even need MP, you could use a tool like rush to parallelize.
  2. worker parses data, generates the output dict
  3. worker opens the output file with append flag. dump the data and flush immediately:
with open(out_file, 'a') as fp: 
  print(json.dumps(data), file=fp, flush=True) 

Flush ensure that as long as your data is less than the buffer size on your kernel (usually several MB), your different processes won't stomp on each other and conflict writes. If they do get conflicted, you may need to write to a separate output file for each worker, and then join them all.

You can join the files and/or convert to regular JSON array if needed using jq. To be honest, just embrace jsonlines. It's a way better data format for long lists of objects, since you don't have to parse the whole thing in memory.

Upvotes: 1

Related Questions