Reputation: 81
Merging multiple dask dataframes crashes my computer.
Hi,
I am attempting to merge a long list of csv files with dask. Each csv file contains a list of timestamps when a variable has changed its value, together with the value; e.g. for variable1 we have:
timestamp; value
2016-01-01T00:00:00; 3
2016-01-03T00:00:00; 4
while for variable 2, we have:
timestamp; value
2016-01-02T00:00:00; 8
2016-01-04T00:00:00; 9
The timestamps in each csv can differ (as they are linked to the moment a variable has changed value). As end result, I want to obtain a hdf file in which each variable has as value at each occurring timestamp, forward filled. Hence, something like the following:
timestamp; var1; var2,
2016-01-01T00:00:00; 3 ; nan
2016-01-02T00:00:00; 3 ; 8
2016-01-03T00:00:00; 4 ; 8
2016-01-04T00:00:00; 4 ; 9
Below, I provide the meta code which I am using to achieve this parsing and merging.
# import
from pathlib import Path
from functools import partial
import import dask.dataframe as dd
import dask.bag as db
from dask import delayed
from dask.diagnostics import ProgressBar
# define how to parse the dates
def parse_dates(df):
return pd.to_datetime(df['timestamp'], format='%Y-%m-%dT%H:%M:%S', errors='coerce')
# parse csv files to dask dataframe
def parse_csv2filtered_ddf(fn_file, sourcedir):
fn = source_dir.joinpath(fn_tag)
ddf = dd.read_csv(fn, sep=';', usecols=['timestamp', 'value'],
blocksize=10000000, dtype={'value': 'object'})
meta = ('timestamp', 'datetime64[ns]')
ddf['timestamp'] = ddf.map_partitions(parse_dates, meta=meta)
v = fn_file.split('.csv')[0]
ddf = ddf.dropna() \
.rename(columns={'value': v}) \
.set_index('timestamp')
return ddf
# define how to merge
def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf
# set source directory
source_dir = Path('/path_to_list_of_csv_files/')
# get list of files to parse
lcsv = os.listdir(source_dir)
# make partial function to fix sourcedir
parse_csv2filtered_ddf_partial = partial(parse_csv2filtered_ddf, source_dir)
# make bag of dataframes
b = db.from_sequence(lcsv).map(parse_csv2filtered_ddf_partial)
# merge all dataframes and reduce to 1 dataframe
df = b.fold(binop=merge_ddf)
# forward fill the NaNs and drop the remaining
#
# please note that I am choosing here npartitions equal to 48 as
# experiments with smaller sets of data allow me to estimate
# the output size of the df which should be around 48 GB, hence
# chosing 48 should lead to partition of 1 GB, I guess.
df = delayed(df).repartition(npartitions=48). \
fillna(method='ffill'). \
dropna()
# write output to hdf file
df = df.to_hdf(output_fn, '/data')
# start computation
with ProgressBar():
df.compute(scheduler='threads')
Unfortunately, this scripts never runs to a successfull end. In particular, monitoring the memory usage I can follow the memory to fully flow up, after which either the computer or the program crashes.
I have tried to use only a single thread, in combination with multiple processes; e.g.
import dask
dask.config.set(scheduler='single-threaded')
in combination with
with ProgressBar():
df.compute(scheduler='processes', num_workers=3)
also without any success.
Any pointers in the right direction are warmly welcomed.
EDIT
Below, I provide a more concise script which should allow to generate similar data to reproduce the MemoryError.
import numpy as np
import pandas as pd
from dask import delayed
from dask import dataframe as dd
from dask import array as da
from dask import bag as db
from dask.diagnostics import ProgressBar
from datetime import datetime
from datetime import timedelta
from functools import partial
def make_ddf(col, values, timestamps):
n = int(col) % 2
idx_timestamps = timestamps[n::2]
df = pd.DataFrame.from_dict({str(col): values, 'timestamp': idx_time})
ddf = dd.from_pandas(df, chunksize=100000000)
ddf = ddf.dropna() \
.set_index('timestamp')
return ddf
def merge_ddf(x, y):
ddf = x.merge(y, how='outer', left_index=True, right_index=True, npartitions=4)
return ddf
N_DF_TO_MERGE = 55 # number of dataframes to merge
N_PARTITIONS_REPARTITION = 55
values = np.random.randn(5000000, 1).flatten()
timestamps = [datetime.now() + timedelta(seconds=i*1) for i in range(10000000)]
columns = list(range(N_DF_TO_MERGE))
# fix values and times
make_ddf_partial = partial(make_ddf, values=values, timestamps=timestamps)
# make bag
b = db.from_sequence(columns).map(make_ddf_partial)
# merge all dataframes and reduce to one
df = b.fold(binop=merge_ddf)
# forward fill the NaNs and drop the remaining
df = delayed(df).repartition(npartitions=N_PARTITIONS_REPARTITION). \
fillna(method='ffill'). \
dropna()
# write output to hdf file
df = df.to_hdf('magweg.hdf', '/data')
with ProgressBar():
df.compute(scheduler='threads')
which leads up to the following error:
Traceback (most recent call last): File "mcve.py", line 63, in main() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 764, in call return self.main(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 717, in main rv = self.invoke(ctx) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 956, in invoke return ctx.invoke(self.callback, **ctx.params) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\click\core.py", line 555, in invoke return callback(*args, **kwargs) File "mcve.py", line 59, in main df.compute(scheduler='threads') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 156, in compute (result,) = compute(self, traverse=False, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 398, in compute results = schedule(dsk, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\utils.py", line 697, in call return getattr(obj, self.method)(*args, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\core.py", line 1154, in to_hdf return to_hdf(self, path_or_buf, key, mode, append, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\io\hdf.py", line 227, in to_hdf scheduler=scheduler, **dask_kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\base.py", line 166, in compute_as_if_collection return schedule(dsk2, keys, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\threaded.py", line 76, in get pack_exception=pack_exception, **kwargs) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 459, in get_async raise_exception(exc, tb) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\compatibility.py", line 112, in reraise raise exc File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\local.py", line 230, in execute_task result = _execute_task(task, data) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\core.py", line 119, in _execute_task return func(*args2) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\dask\dataframe\methods.py", line 103, in boundary_slice result = getattr(df, kind)[start:stop] File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1500, in getitem return self._getitem_axis(maybe_callable, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1867, in _getitem_axis return self._get_slice_axis(key, axis=axis) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 1536, in _get_slice_axis return self._slice(indexer, axis=axis, kind='iloc') File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\indexing.py", line 151, in _slice return self.obj._slice(obj, axis=axis, kind=kind) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\generic.py", line 3152, in _slice result = self._constructor(self._data.get_slice(slobj, axis=axis)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 700, in get_slice bm._consolidate_inplace() File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 929, in _consolidate_inplace self.blocks = tuple(_consolidate(self.blocks)) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\managers.py", line 1899, in _consolidate _can_consolidate=_can_consolidate) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\pandas\core\internals\blocks.py", line 3146, in _merge_blocks new_values = np.vstack([b.values for b in blocks]) File "C:\Users\tomasvanoyen\Miniconda3\envs\stora\lib\site-packages\numpy\core\shape_base.py", line 234, in vstack return _nx.concatenate([atleast_2d(_m) for _m in tup], 0) MemoryError
Upvotes: 4
Views: 1327
Reputation: 57251
Two things seem odd.
Upvotes: 0