santosh kumar
santosh kumar

Reputation: 9

flle processing using multiprocessing - python

I am beginner to Python and trying to add few lines of code to convert json to csv and back to json. Have thousands of files (size 300 MB) to be converted and processed. With current program (using 1 CPU), i am not able to use 16 CPUs of server and need suggestions to fine tune the program for multiprocessing. Below is my code with python 3.7 version.

import json
import csv
import os

os.chdir('/stagingData/Scripts/test')

for JsonFile in os.listdir(os.getcwd()):
    PartialFileName = JsonFile.split('.')[0]

    j = 1
    with open(PartialFileName +".csv", 'w', newline='') as Output_File:

        with open(JsonFile) as fileHandle:
            i = 1
            for Line in fileHandle:
                try:
                    data = json.loads(Line, parse_float=str)
                except:
                    print("Can't load line {}".format(i))
                if i == 1:
                    header = data.keys()
                    output = csv.writer(Output_File)
                    output.writerow(header) #Writes header row
                i += 1
                output.writerow(data.values()) #writes values row
        j += 1

Appreciate suggestions on multiprocessing logic

Upvotes: 1

Views: 220

Answers (2)

joeforker
joeforker

Reputation: 41817

Since you have many files, the simplest multiprocessing example from the documentation should work for you. https://docs.python.org/3.4/library/multiprocessing.html?highlight=process

f(JsonFile):
    # open input, output files and convert

with Pool(16) as p:
    p.map(f, os.listdir(os.getcwd()))

You could also try replacing listdir with os.scandir(), which doesn't have to return all directory entries before starting.

Upvotes: 1

monkut
monkut

Reputation: 43870

If you have a single big file that you want to process more effectively I suggest the following:

  1. Split file into chunks

  2. Create a process to process each chunk

  3. (if necessary) merge the processed chunks back into a single file

Something like this:

import csv
import json
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor

source_big_file = Path('/path/to/file')

def chunk_file_by_line(source_filepath: Path, chunk_size: int = 10_000):
    chunk_line_size = 10_000
    intermediate_file_handlers = {}
    last_chunk_filepath = None
    with source_big_file.open('r', encoding='utf8') as big:
        for line_number, line in big:
            group = line_number - (line_number % chunk_line_size)
            chunk_filename = f'{source_big_file.stem}.g{group}{source_big_file.suffix}'
            chunk_filepath = source_big_file.parent / chunk_filename
            if chunk_filepath not in intermediate_file_handlers:
                file_handler = chuck_filepath.open('w', encoding='utf8')
                intermediate_file_handlers[chunk_filepath] = file_handler
                if last_chunk_filepath:
                    last_file_hanlder = intermediate_file_handlers[last_chunk_filepath]
                    last_file_handler.close()
                    yield last_chunk_filepath
            else:
                file_handler = intermediate_file_handlers[chunk_filepath]
            file_handler.write(line)
            last_chunk_filepath  = chunk_filepath
    # output last one
    yield last_chunk_filepath


def json_to_csv(json_filepath: Path) -> Path:
    csv_filename = f'{json_filepath.stem}.csv'
    csv_filepath = json_filepath.parent / csv_filename
    with csv_filepath.open('w', encoding='utf8') as csv_out, json_filepath.open('r', encoding='utf8') as json_in:
        dwriter = csv.DictWriter(csv_out)
        headers_written = False
        for json_line in json_in:
            data = json.loads(json_line)
            if not headers_written:
                # create header record
                headers = {k:k for k in data.keys()}
                dwriter.writeline(headers)                
                headers_written = True
            dwriter.writeline(data)
    return csv_filepath


with ProcessPoolExecutor() as pool:
    futures = []
    for chunk_filepath in chuck_file_by_line(source_big_file):
        future = pool.submit(json_to_csv, chunk_filepath)
        futures.append(future)

    # wait for all to finish
    for future in futures:
        csv_filepath = future.result(timeout=None)  # waits until complete
        print(f'conversion complete> csv filepath: {csv_filepath}')

Upvotes: 1

Related Questions