Temple Jersey
Temple Jersey

Reputation: 63

Large csv to parquet using Dask - OOM

I've 7 csv files with 8 GB each and need to convert to parquet.

Memory usage goes to 100 GB and I had to kill it . I tried with Distributed Dask as well . The memory is limited to 12 GB but no output produced for long time. FYI. I used to traditional pandas with Chunking + Prod consumer --> was able to convert in 30 mins What I'm missing for Dask processing ?

def ProcessChunk(df,...):
    df.to_parquet()     

for factfile in fArrFileList:
   df = dd.read_csv(factfile, blocksize="100MB",
                 dtype=fColTypes, header=None, sep='|',names=fCSVCols)
   result = ProcessChunk(df,output_parquet_file, chunksize, fPQ_Schema, fCSVCols, fColTypes)

Upvotes: 3

Views: 1705

Answers (2)

rpanai
rpanai

Reputation: 13447

I had a similar problem and I found that use dask to split to smallest parquet is very slow and will eventually fail. If you have access to a Linux Terminal you can use parallel or split. For an example of their usage check answers from here

My workflow is supposing your files are called file1.csv,..., file7.csv and are stored in data/raw. I'm assuming you are using the terminal commands from your notebook and this is the reason I'm adding the %%bash magic

  • create folders data/raw_part/part1/,... ,data/raw_part/part7/
%%bash
for year in {1..7}
do
mkdir -p data/raw_parts/part${i}
done
  • for each file run (in case you want to use parallel)
%%bash
cat data/raw/file1.csv | parallel --header : --pipe -N1000000 'cat >data/raw_parts/part1/file_{#}.csv'```

convert files to parquet

  1. first create output folders
%%bash
for year in {1..7}
do
mkdir -p data/processed/part${i}
done
  1. define function to convert csv to parquet
import pandas as pd
import os
from dask import delayed, compute

# this can run in parallel
@delayed
def convert2parquet(fn, fldr_in, fldr_out):
    fn_out = fn.replace(fldr_in, fldr_out)\
               .replace(".csv", ".parquet")

    df = pd.read_csv(fn)
    df.to_parquet(fn_out, index=False)
  1. get all files you want to convert
jobs = []
fldr_in = "data/raw_parts/"
for (dirpath, dirnames, filenames) in os.walk(fldr_in):
    if len(filenames) > 0:
        jobs += [os.path.join(dirpath, fn)
                 for fn in filenames]
  1. process all in parallel
%%time
to_process = [convert2parquet(job, fldr_in, fldr_out) for job in jobs]
out = compute(to_process)

Upvotes: 1

Temple Jersey
Temple Jersey

Reputation: 63

Thanks all for suggestions. map_partitions worked.

df = dd.read_csv(filename, blocksize="500MB",
                     dtype=fColTypes, header=None, sep='|',names=fCSVCols)
df.map_partitions(DoWork,output_parquet_file, chunksize, Schema, CSVCols, fColTypes).compute(num_workers=2)

But the same approach for Dask Distributed Local Cluster didn't work well.when the csv size < 100 MB it worked in local cluster mode.

Upvotes: 2

Related Questions