Reputation: 8203
The following operation works, but takes nearly 2h:
from dask import dataframe as ddf
ddf.read_csv('data.csv').to_parquet('data.pq')
Is there a way to parallelize this?
The file data.csv
is ~2G uncompressed with 16 million rows by 22 columns.
Upvotes: 4
Views: 1748
Reputation: 19308
Writing out multiple CSV files in parallel is also easy with the repartition
method:
df = dd.read_csv('data.csv')
df = df.repartition(npartitions=20)
df.to_parquet('./data_pq', write_index=False, compression='snappy')
Dask likes working with partitions that are around 100 MB, so 20 partitions should be good for a 2GB dataset.
You could also speed this up, by splitting the CSV before reading it, so Dask can read the CSV files in parallel. You could use these tactics to break up the 2GB CSV into 20 different CSVs and then write them out without repartitioning:
df = dd.read_csv('folder_with_small_csvs/*.csv')
df.to_parquet('./data_pq', write_index=False, compression='snappy')
Upvotes: 0
Reputation: 1188
read_csv
has a blocksize
parameter (docs) that you can use to control the size of the resulting partition and hence, the number of partitions. That, from what I understand, will result in reading the partitions in parallel - each worker will read block size at relevant offset.
You can set blocksize
so that it yields the required number of partition to take advantage of the cores you have. For example
cores = 8
size = os.path.getsize('data.csv')
ddf = dd.read_csv("data.csv", blocksize=np.rint(size/cores))
print(ddf.npartitions)
Will output:
8
Better yet, you can try to modify the size so that the resulting parquet has partitions of recommended size (which I have seen different numbers in different places :-|).
Upvotes: 0
Reputation: 13437
I'm not sure if it is a problem with data or not. I made a toy example on my machine and the same command takes ~9 seconds
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask
client = Client()
# if you wish to connect to the dashboard
client
# fake df size ~2.1 GB
# takes ~180 seconds
N = int(5e6)
df = pd.DataFrame({i: np.random.rand(N)
for i in range(22)})
df.to_csv("data.csv", index=False)
# the following takes ~9 seconds on my machine
dd.read_csv("data.csv").to_parquet("data_pq")
Upvotes: 3