Reputation: 479
I am basically converting some csv
files to parquet
. To do so, I decided to use dask
, read the csv
on dask
and write it back to parquet
. I am using a big blocksize as the customer requested (500 MB). The csv
's are 15 GB and bigger (until 50 GB), the machine has 64 GB RAM. Whenever I am running the basic to_parquet
command, RAM starts increasing and eventually is so high that linux kills the process. Does somebody know why this happens? When I dont specify blocksizes, it works but it creates a lot of small parquet files (24 MB). Is there a way to solve this creating blocks of at least 500 MB.
_path = 'E://'
dt = dd.read_csv(_path+'//temporal.csv', blocksize = 500e5)
dt.to_parquet(path=_path+'/t.parq', compression='gzip')`
Upvotes: 4
Views: 3490
Reputation: 57251
You say that your partitions end up as something like 15GB each. If you're using Dask with many threads then there might be several of these in memory at once. If you only have 64GB, then it's quite possible to run out of ram.
Some options:
Upvotes: 2
Reputation: 13437
Please let's take it as a elaborated comment. Why don't first split the files in 500 Mb csv and then convert to parquet with dask?
import pandas as pd
import numpy as np
import os
fldr = "data/splitted"
fldr_out = "data/splitted_parquet"
os.makedirs(fldr)
os.makedirs(fldr_out)
# this for a ~4gb csv
rows = int(1e7)
cols = 20
df = pd.DataFrame(np.random.rand(rows, cols),
columns=["col_{}".format(i) for i in range(cols)])
df.to_csv("data/file.csv")
In linux you can split it in files with 500M with
split -b 500M --additional-suffix=".csv" file.csv splitted/file_part_
Now you can convert to parquet with dask
from dask import compute()
from dask import delayed
@delayed
def csv2parq(fn):
out = fn.replace(fldr, fldr_out)
pd.read_csv(fn).to_parquet(out)
fns = os.listdir(fldr)
fns = [os.path.join(fldr, fn) for fn in fns]
compute([csv2parq(fn) for fn in fns])
Upvotes: 1