Reputation: 63
I've 7 csv files with 8 GB each and need to convert to parquet.
Memory usage goes to 100 GB and I had to kill it . I tried with Distributed Dask as well . The memory is limited to 12 GB but no output produced for long time. FYI. I used to traditional pandas with Chunking + Prod consumer --> was able to convert in 30 mins What I'm missing for Dask processing ?
def ProcessChunk(df,...):
df.to_parquet()
for factfile in fArrFileList:
df = dd.read_csv(factfile, blocksize="100MB",
dtype=fColTypes, header=None, sep='|',names=fCSVCols)
result = ProcessChunk(df,output_parquet_file, chunksize, fPQ_Schema, fCSVCols, fColTypes)
Upvotes: 3
Views: 1705
Reputation: 13447
I had a similar problem and I found that use dask to split to smallest parquet is very slow and will eventually fail. If you have access to a Linux Terminal you can use parallel or split. For an example of their usage check answers from here
My workflow is supposing your files are called file1.csv,..., file7.csv
and are stored in data/raw
. I'm assuming you are using the terminal commands from your notebook and this is the reason I'm adding the %%bash
magic
data/raw_part/part1/,... ,data/raw_part/part7/
%%bash
for year in {1..7}
do
mkdir -p data/raw_parts/part${i}
done
%%bash
cat data/raw/file1.csv | parallel --header : --pipe -N1000000 'cat >data/raw_parts/part1/file_{#}.csv'```
%%bash
for year in {1..7}
do
mkdir -p data/processed/part${i}
done
import pandas as pd
import os
from dask import delayed, compute
# this can run in parallel
@delayed
def convert2parquet(fn, fldr_in, fldr_out):
fn_out = fn.replace(fldr_in, fldr_out)\
.replace(".csv", ".parquet")
df = pd.read_csv(fn)
df.to_parquet(fn_out, index=False)
jobs = []
fldr_in = "data/raw_parts/"
for (dirpath, dirnames, filenames) in os.walk(fldr_in):
if len(filenames) > 0:
jobs += [os.path.join(dirpath, fn)
for fn in filenames]
%%time
to_process = [convert2parquet(job, fldr_in, fldr_out) for job in jobs]
out = compute(to_process)
Upvotes: 1
Reputation: 63
Thanks all for suggestions. map_partitions worked.
df = dd.read_csv(filename, blocksize="500MB",
dtype=fColTypes, header=None, sep='|',names=fCSVCols)
df.map_partitions(DoWork,output_parquet_file, chunksize, Schema, CSVCols, fColTypes).compute(num_workers=2)
But the same approach for Dask Distributed Local Cluster didn't work well.when the csv size < 100 MB it worked in local cluster mode.
Upvotes: 2