Jonatan Aponte
Jonatan Aponte

Reputation: 479

Writing huge dask dataframes to parquet fails out of memory

I am basically converting some csv files to parquet. To do so, I decided to use dask, read the csv on dask and write it back to parquet. I am using a big blocksize as the customer requested (500 MB). The csv's are 15 GB and bigger (until 50 GB), the machine has 64 GB RAM. Whenever I am running the basic to_parquet command, RAM starts increasing and eventually is so high that linux kills the process. Does somebody know why this happens? When I dont specify blocksizes, it works but it creates a lot of small parquet files (24 MB). Is there a way to solve this creating blocks of at least 500 MB.

_path = 'E://'

dt = dd.read_csv(_path+'//temporal.csv', blocksize = 500e5)

dt.to_parquet(path=_path+'/t.parq', compression='gzip')`

Upvotes: 4

Views: 3490

Answers (2)

MRocklin
MRocklin

Reputation: 57251

You say that your partitions end up as something like 15GB each. If you're using Dask with many threads then there might be several of these in memory at once. If you only have 64GB, then it's quite possible to run out of ram.

Some options:

  1. Use a smaller chunk size
  2. Encode your data differently so that it is more space efficient. For example text data (a common cause of memory blowup in Python) might be more efficiently stored as categoricals
  3. Use fewer threads

Upvotes: 2

rpanai
rpanai

Reputation: 13437

Please let's take it as a elaborated comment. Why don't first split the files in 500 Mb csv and then convert to parquet with dask?

import pandas as pd
import numpy as np
import os

fldr = "data/splitted"
fldr_out = "data/splitted_parquet"
os.makedirs(fldr)
os.makedirs(fldr_out)

# this for a ~4gb csv
rows = int(1e7)
cols = 20
df = pd.DataFrame(np.random.rand(rows, cols),
                  columns=["col_{}".format(i) for i in range(cols)])


df.to_csv("data/file.csv")

In linux you can split it in files with 500M with

split -b 500M --additional-suffix=".csv" file.csv splitted/file_part_

Now you can convert to parquet with dask

from dask import compute()
from dask import delayed

@delayed
def csv2parq(fn):
    out = fn.replace(fldr, fldr_out)
    pd.read_csv(fn).to_parquet(out)

fns = os.listdir(fldr)
fns = [os.path.join(fldr, fn) for fn in fns]
compute([csv2parq(fn) for fn in fns])

Upvotes: 1

Related Questions