Chethu
Chethu

Reputation: 555

Reading multiple large csv files of size 10GB plus parallel in python

I have a client shared feed of 100 GB in 10 CSV files each having 10GB.

when we want to parse all files to create one final feed file, it will take more than one day to complete.

So I have done parsing multiple CSV files parallelly using python multiprocessing pool.

I have done testing for three files of size 30 GB using below code and is taking around 10 mins to complete.

Can somebody look into my code and help me to improve the below code to parse faster or suggest me any better way to parse files.

# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import time
import csv
import codecs

def unicode_csv_reader(csvfile,dialect=csv.excel, **kwargs):
    with open(csvfile) as f:
        for row in csv.reader(codecs.iterencode(codecs.iterdecode(f,"utf-8"), "utf-8"),quotechar='"',delimiter=',',quoting=csv.QUOTE_ALL, skipinitialspace=True,dialect=dialect, **kwargs):
            yield [e.decode("utf-8") for e in row]


def process_file(name):
    ''' Process one file:'''
    csv_reader=unicode_csv_reader(name)
    for row in csv_reader:
        if row is not None and len(row) != 0 and row[1]=="in stock" and row[18]=="Book":
        linePrint=row[0]+"\t"+row[6]+"\t"+row[12]+"\t"+row[4]+"\t"+row[17]+"\t"+row[17]+"\t"+row[10]+"\t"+row[9]+"\t"+"\t"+row[18]+"\t"+row[18]+"\t"+row[8]+"\t"+row[8]+"\t\t"
        print linePrint.encode("utf-8")


def process_files_parallel():
    ''' Process each file in parallel via Poll.map() '''
    pool=Pool(processes=4)
    results=pool.map(process_file, ["t1.csv","t2.csv","t3.csv"])
    return results


if __name__ == '__main__':

    start=time.time()
    res=process_files_parallel()
    print res

I'm running this file in my ubuntu machine like below

python multiprocessfiles.py > finalfeed.csv

Sample data from client feed

"id", "availability", "condition", "description", "image_link", "link", "title", "brand", "google_product_category", "price", "sale_price", "currency", "android_url", "android_app_name", "android_package", "discount_percentage","discount_value", "category", "super_category"
"5780705772161","in stock","new","(ise) Genetics: Analysis Of Eenes And Genomics","https://rukminim1.client.com/image/600/600/jeiukcw0/book/9/8/2/medicinal-inorganic-chemistry-original-imaf37yeyhyhzwfm.jpeg?q=90","http://www.client.com/ise-genetics-analysis-eenes-genomics/p/itmd32spserbxyhf?pid=5780705772161&marketplace=client&cmpid=content_appretar_BooksMedia_Book","(ise) Genetics: Analysis Of Eenes And Genomics","W. Jones","Books","3375","1893","INR","client://fk.dl/de_wv_CL%7Csem_--_http%3A%2F%2Fwww.client.com%2Fise-genetics-analysis-eenes-genomics%2Fp%2Fitmd32spserbxyhf~q~pid%3D5780705772161%26marketplace%3Dclient_--_cmpid_--_content_appretar_BooksMedia_Book","client","com.client.android","43","1482","BooksMedia","Book"

Upvotes: 6

Views: 1629

Answers (1)

jedi
jedi

Reputation: 585

While not exactly answering your question, this should be doable in dask. It process in parallel by default. Reading multiple files in parallel is as simple as this:

import dask.dataframe as dd
df = dd.read_csv('t*.csv')

More details can be found at the provided link.

Upvotes: 2

Related Questions