Reputation: 122142
Given 3 csv files of the same number of rows, like these
fx.csv
:
7.23,4.41,0.17453,0.12
6.63,3.21,0.3453,0.32
2.27,2.21,0.3953,0.83
f0.csv
:
1.23,3.21,0.123,0.12
8.23,9.21,0.183,0.32
7.23,6.21,0.123,0.12
and f1.csv
:
6.23,3.21,0.153,0.123
2.23,2.26,0.182,0.22
9.23,9.21,0.183,0.135
The f0.csv
and f1.csv
come with corresponding labels 0
s and 1
s.
The goal is to read into a dask.DataFrame
. The concatenated values such that we get
fx.csv
concatenated horizontally with f0.csv
and 0
sfx.csv
concatenated horizontally with f1.csv
and 1
sI have tried doing this to read them into the dask file and save into a hdf store:
import dask.dataframe as dd
import dask.array as da
fx = dd.read_csv('fx.csv', header=None)
f0 = dd.read_csv('f0.csv', header=None)
f1 = dd.read_csv('f1.csv', header=None)
l0 = dd.from_array(np.array([1] * len(fx)))
l1 = dd.from_array(np.array([1] * len(fx)))
da.to_np_stack('data/',
da.concatenate( [
dd.concat([fx.compute(), f0.compute(), l0.compute()], axis=1),
dd.concat([fx.compute(), f1.compute(), l1.compute()], axis=1)
], axis=0, allow_unknown_chunksizes=True),
axis=0)
I can also do these in unix before reading it into the dask file, like this:
# Create the label files.
$ wc -l fx.csv
4
$ seq 4 | sed "c 0" > l0.csv
$ seq 4 | sed "c 0" > l1.csv
# Concat horizontally
$ paste fx.csv f0.csv l0.csv -d"," > x0.csv
$ paste fx.csv f1.csv l1.csv -d"," > x1.csv
$ cat x0.csv x1.csv > data.csv
The actual dataset has 256 columns for each f*.csv
files and 22,000,000 rows. So it isn't easy to run the dask python code.
My questions (in parts are):
Is the dask method in the Python code the easiest/memory efficient method to read the data and output it into a hdf5 store?
Is there any other method that is more efficient than the unix way described above?
Upvotes: 0
Views: 519
Reputation: 1256
You can read files line by line and make new .csv by them instead of loading all of data in your ram at first. Below code do it for you:
FILE_PATHS = [
'/home/amir/data/1.csv',
'/home/amir/data/2.csv',
'/home/amir/data/3.csv',
]
NEW_FILE_PATH = '/home/amir/data/new.csv'
fout = open(NEW_FILE_PATH, 'w')
for file_path in FILE_PATHS:
with open(file_path, 'r') as fin:
for line in fin:
fout.write(line)
About your questions:
Upvotes: 1
Reputation: 16571
The code below is a modified version of your snippet.
When reading csv, the allocation of lines across partitions is based on a chunk size, so basic concat operations are not guaranteed to work out of the box because the partitions might not be aligned. To resolve it, index the data.
Next, creating columns of 0/1s can be done using .assign
method (works same as in pandas
). Before saving the array, you might also want to rechunk as described in this answer, but that's optional.
import dask.dataframe as dd
import dask.array as da
def _index_ddf(df):
"""Generate a unique row-based index. See also https://stackoverflow.com/a/65839787/10693596"""
df['new_index'] = 1
df['new_index'] = df['new_index'].cumsum()
df = df.set_index('new_index', sorted=True)
return df
fx = dd.read_csv('fx.csv', header=None)
fx = _index_ddf(fx)
f0 = dd.read_csv('f0.csv', header=None)
f0 = _index_ddf(f0)
f1 = dd.read_csv('f1.csv', header=None)
f1 = _index_ddf(f1)
# columns of 0/1 can be created by assignment
A1 = dd.concat([fx, f0], axis=1).assign(zeros=0).to_dask_array(lengths=True)
A2 = dd.concat([fx, f1], axis=1).assign(ones=1).to_dask_array(lengths=True)
# stack
A = da.concatenate([A1, A2], axis=0)
# save
da.to_npy_stack('data/', A, axis=0)
#optional: to have even sized chunks, can rechunk the data, see https://stackoverflow.com/a/73218995/10693596
Upvotes: 2