alvas
alvas

Reputation: 122142

How to efficiently concat csv files in dask horizontally, then vertically?

Given 3 csv files of the same number of rows, like these

fx.csv:

7.23,4.41,0.17453,0.12
6.63,3.21,0.3453,0.32
2.27,2.21,0.3953,0.83

f0.csv:

1.23,3.21,0.123,0.12
8.23,9.21,0.183,0.32
7.23,6.21,0.123,0.12

and f1.csv:

6.23,3.21,0.153,0.123
2.23,2.26,0.182,0.22
9.23,9.21,0.183,0.135

The f0.csv and f1.csv come with corresponding labels 0s and 1s.

The goal is to read into a dask.DataFrame. The concatenated values such that we get

  1. fx.csv concatenated horizontally with f0.csv and 0s
  2. fx.csv concatenated horizontally with f1.csv and 1s
  3. concatenated (1) and (2) vertically

I have tried doing this to read them into the dask file and save into a hdf store:

import dask.dataframe as dd
import dask.array as da

fx = dd.read_csv('fx.csv', header=None)
f0 = dd.read_csv('f0.csv', header=None)
f1 = dd.read_csv('f1.csv', header=None)

l0 = dd.from_array(np.array([1] * len(fx)))
l1 = dd.from_array(np.array([1] * len(fx)))

da.to_np_stack('data/', 
  da.concatenate( [
    dd.concat([fx.compute(), f0.compute(), l0.compute()], axis=1),
    dd.concat([fx.compute(), f1.compute(), l1.compute()], axis=1)
    ], axis=0, allow_unknown_chunksizes=True),
  axis=0)

I can also do these in unix before reading it into the dask file, like this:

# Create the label files.
$ wc -l fx.csv
4

$ seq 4 | sed "c 0" > l0.csv
$ seq 4 | sed "c 0" > l1.csv

# Concat horizontally
$ paste fx.csv f0.csv l0.csv -d"," > x0.csv
$ paste fx.csv f1.csv l1.csv -d"," > x1.csv

$ cat x0.csv x1.csv > data.csv

The actual dataset has 256 columns for each f*.csv files and 22,000,000 rows. So it isn't easy to run the dask python code.

My questions (in parts are):

  1. Is the dask method in the Python code the easiest/memory efficient method to read the data and output it into a hdf5 store?

  2. Is there any other method that is more efficient than the unix way described above?

Upvotes: 0

Views: 519

Answers (2)

Amir Hossein Shahdaei
Amir Hossein Shahdaei

Reputation: 1256

You can read files line by line and make new .csv by them instead of loading all of data in your ram at first. Below code do it for you:

FILE_PATHS = [
    '/home/amir/data/1.csv',
    '/home/amir/data/2.csv',
    '/home/amir/data/3.csv',
]
NEW_FILE_PATH = '/home/amir/data/new.csv'

fout = open(NEW_FILE_PATH, 'w')
for file_path in FILE_PATHS:
    with open(file_path, 'r') as fin:
        for line in fin:
            fout.write(line)

About your questions:

  1. as long you read files line by line its efficient no matter in what language.
  2. You really should try pyspark. it reads, transforms, writes data in parallel and in a very genius way :)

Upvotes: 1

SultanOrazbayev
SultanOrazbayev

Reputation: 16571

The code below is a modified version of your snippet.

When reading csv, the allocation of lines across partitions is based on a chunk size, so basic concat operations are not guaranteed to work out of the box because the partitions might not be aligned. To resolve it, index the data.

Next, creating columns of 0/1s can be done using .assign method (works same as in pandas). Before saving the array, you might also want to rechunk as described in this answer, but that's optional.

import dask.dataframe as dd
import dask.array as da

def _index_ddf(df):
   """Generate a unique row-based index. See also https://stackoverflow.com/a/65839787/10693596"""
   df['new_index'] = 1
   df['new_index'] = df['new_index'].cumsum()
   df = df.set_index('new_index', sorted=True)
   return df

fx = dd.read_csv('fx.csv', header=None)
fx = _index_ddf(fx)

f0 = dd.read_csv('f0.csv', header=None)
f0 = _index_ddf(f0)

f1 = dd.read_csv('f1.csv', header=None)
f1 = _index_ddf(f1)

# columns of 0/1 can be created by assignment
A1 = dd.concat([fx, f0], axis=1).assign(zeros=0).to_dask_array(lengths=True)
A2 = dd.concat([fx, f1], axis=1).assign(ones=1).to_dask_array(lengths=True)

# stack
A = da.concatenate([A1, A2], axis=0)

# save
da.to_npy_stack('data/', A, axis=0)

#optional: to have even sized chunks, can rechunk the data, see https://stackoverflow.com/a/73218995/10693596

Upvotes: 2

Related Questions