Daniel Mahler
Daniel Mahler

Reputation: 8203

parallelize conversion of a single 16M row csv to Parquet with dask

The following operation works, but takes nearly 2h:

from dask import dataframe as ddf
ddf.read_csv('data.csv').to_parquet('data.pq')

Is there a way to parallelize this?

The file data.csv is ~2G uncompressed with 16 million rows by 22 columns.

Upvotes: 4

Views: 1748

Answers (3)

Powers
Powers

Reputation: 19308

Writing out multiple CSV files in parallel is also easy with the repartition method:

df = dd.read_csv('data.csv')
df = df.repartition(npartitions=20)
df.to_parquet('./data_pq', write_index=False, compression='snappy')

Dask likes working with partitions that are around 100 MB, so 20 partitions should be good for a 2GB dataset.

You could also speed this up, by splitting the CSV before reading it, so Dask can read the CSV files in parallel. You could use these tactics to break up the 2GB CSV into 20 different CSVs and then write them out without repartitioning:

df = dd.read_csv('folder_with_small_csvs/*.csv')
df.to_parquet('./data_pq', write_index=False, compression='snappy')

Upvotes: 0

Tomer Cagan
Tomer Cagan

Reputation: 1188

read_csv has a blocksize parameter (docs) that you can use to control the size of the resulting partition and hence, the number of partitions. That, from what I understand, will result in reading the partitions in parallel - each worker will read block size at relevant offset.

You can set blocksize so that it yields the required number of partition to take advantage of the cores you have. For example

cores = 8
size = os.path.getsize('data.csv')
ddf = dd.read_csv("data.csv", blocksize=np.rint(size/cores))
print(ddf.npartitions)

Will output:

8

Better yet, you can try to modify the size so that the resulting parquet has partitions of recommended size (which I have seen different numbers in different places :-|).

Upvotes: 0

rpanai
rpanai

Reputation: 13437

I'm not sure if it is a problem with data or not. I made a toy example on my machine and the same command takes ~9 seconds

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask
client = Client()
# if you wish to connect to the dashboard
client

# fake df size ~2.1 GB
# takes ~180 seconds
N = int(5e6)
df = pd.DataFrame({i: np.random.rand(N) 
                   for i in range(22)})
df.to_csv("data.csv", index=False)

# the following takes ~9 seconds on my machine
dd.read_csv("data.csv").to_parquet("data_pq")

Upvotes: 3

Related Questions