Steve OB
Steve OB

Reputation: 63

Convert a csv larger than RAM into parquet with Dask

I have approximately 60,000 small CSV files of varying sizes 1MB to several hundred MB that I would like to convert into a single Parquet file. The total size of all the CSVs is around 1.3 TB. This is larger than the memory of the server that I am using (678 GB available).

Since all the CSVs have same fields, I've concatenated them into a single large file. I tried to process this file with Dask:

ddf = dd.read_csv("large.csv", blocksize="1G").to_parquet("large.pqt")

My understanding was that the blocksize option would prevent dask running out of memory when the job was split over multiple workers.

What happens is that eventually Dask does run out of memory and I get a bunch of messages like:

distributed.nanny - WARNING - Restarting worker

Is my approach completely wrong or am I just missing an important detail?

Upvotes: 1

Views: 1117

Answers (2)

QuentinSchau
QuentinSchau

Reputation: 1

I was faced with a similar problem. All workers ended up using more memory than expected. After pausing and restarting with the error: "distributed.nanny - WARNING - Restarting worker". I was inspired by the solution at here and now it's working for my,I have no more errors or warnings. I only call ".compute()" at the end, without keeping the result in a variable. As on the example below:

this is my dask code :

import dask
import dask.dataframe as dd
from dask.distributed import LocalCluster,Client
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

if __name__ == "__main__":

    dask.config.set({"temporary-directory": "{0}{1}/temp".format(databaseDirPath,base)})
    
    # Parameters
    cluster = LocalCluster(
                    n_workers=8, 
                    threads_per_worker=1,
                    memory_limit='auto',
                    interface='lo'
                    )
    client = Client(cluster)
    
# Some code ...

the header of my .csv :

feat1;feat2;feat3;feat4;feat5;feat6;feat7;feat8;feat9;feat10;feat11;feat12;feat13;feat14;feat15;feat16;feat17;feat18;feat19;feat20;feat21;feat22;feat23;feat24;feat25;feat26;feat27;feat28;feat29;feat30;feat31;feat32;feat33;feat34;feat35;feat36;feat37;feat38;feat39;feat40;feat41;feat42;feat43;feat44;feat45;feat46;feat47;feat48;feat49;feat50;feat51;feat52;feat53;feat54;feat55;feat56;feat57;feat58;feat59;feat60;feat61;feat62;feat63;feat64;feat65;number jobs;level(obj Value);depth;nbPruned

Before :

df = dd.read_csv("{0}.csv/x00*".format(databasePath),sep=";",blocksize="1MB",dtype=np.float64)

    for index,job in enumerate(jobs):
        print("calculation in progress for job : {0}".format(job))
        df2 = df[df["number jobs"] == job].compute()
        for idx, var in enumerate(dfColum.columns):
            print("feature : {0} with {1} jobs".format(var,job))
            cpt, edges = np.histogram(df2[var],bins=bins)
            frq = cpt / cpt.sum()
            # use this for histogram ...
    df.to_parquet("{0}.parquet/".format(databasePath))

After, I even increased the size of the blocksize and took the whole database, i.e. 500GB for my csv file. Whereas before I only had a 10GB database and 1 MB blocksizes.

df = dd.read_csv("{0}.csv/*".format(databasePath),sep=";",blocksize="64MB",dtype=np.float64)

    for index,job in enumerate(jobs):
        print("calculation in progress for job : {0}".format(job))
        df2 = df[df["number jobs"] == job]
        for idx, var in enumerate(dfColum.columns):
            # with numpy
            cpt, edges = np.histogram(df2[var].compute(),bins=bins)
            frq = cpt / cpt.sum()
            # use this for histogram ...

    df.to_parquet("{0}.parquet/".format(databasePath))

Upvotes: 0

MRocklin
MRocklin

Reputation: 57251

You don't have to concatenate all of your files into one large file. dd.read_csv is happy to accept a list of filenames, or a string with a "*" in it.

If you have text data in your CSV file, then loading it into pandas or dask dataframes can expand the amount of memory used considerably, so your 1GB chunks might be quite a bit bigger than you expect. Do things work if you use a smaller chunk size? You might want to consult this doc entry: https://docs.dask.org/en/latest/best-practices.html#avoid-very-large-partitions

In general I recommend using Dask's dashboard to watch the computation, and see what is taking up your memory. This might help you find a good solution. https://docs.dask.org/en/latest/diagnostics-distributed.html

Upvotes: 2

Related Questions