Reputation: 73
I've just started using Dask, and I'm trying to combine different csvs together that have the columns mentioned in dtypes
. I included the dtypes
dictionary to coerce the data in the csvs to their types. But despite that, when I try to save the Dask Dataframe as a parquet file, I encounter this bug. Oddly enough the Dst Port
field is still "int64" when I execute the
, so I'm not sure how to coerce the datatype any other way.
The individual csvs don't have this issue with pandas & dask. Until I combine them, then it just throws this bug. Why is this really happening and how do I fix it?
import dask.dataframe as dd
import pandas as pd
from numpy import dtype
import numpy as np
dtypes = {'Timestamp': dtype('O'),
'Dst Port': dtype('int64'),
'Active Max': dtype('float64'),
'Active Min': dtype('float64'),
'Idle Mean': dtype('float64'),
'Idle Std': dtype('float64'),
'Idle Max': dtype('float64'),
'Idle Min': dtype('float64'),
'Label': dtype('O')}
root = "/home/haktrak/Public"
input_files = ["DDosHOIC.csv", "DosSlowandHulk.csv"]
output_file = root + "combined.parq"
list_of_csvs = [f'{root}/{file}' for file in input_files]
df = dd.read_csv(list_of_csvs, dtype=dtypes, usecols=dtypes.keys(), header=0)
Output & Traceback
TypeError Traceback (most recent call last)
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_tokens()
TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-29-e7cebdeea3d9> in <module>
106 df = dd.read_csv(list_of_csvs, dtype=dtypes, usecols=dtypes.keys(), header=0)
--> 108 df.to_parquet(output_file)
109 # root = "csv/"
110 # df1 = dd.read_csv(root+"TEST_ISCX.csv", dtype=dtypes, usecols=dtypes.keys(), header=0)
~/.local/lib/python3.8/site-packages/dask/dataframe/ in to_parquet(self, path, *args, **kwargs)
4371 from .io import to_parquet
-> 4373 return to_parquet(self, path, *args, **kwargs)
4375 @derived_from(pd.DataFrame)
~/.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/ in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
646 if compute_kwargs is None:
647 compute_kwargs = dict()
--> 648 out = out.compute(**compute_kwargs)
649 return out
~/.local/lib/python3.8/site-packages/dask/ in compute(self, **kwargs)
283 dask.base.compute
284 """
--> 285 (result,) = compute(self, traverse=False, **kwargs)
286 return result
~/.local/lib/python3.8/site-packages/dask/ in compute(*args, **kwargs)
565 postcomputes.append(x.__dask_postcompute__())
--> 567 results = schedule(dsk, keys, **kwargs)
568 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
~/.local/lib/python3.8/site-packages/dask/ in get(dsk, result, cache, num_workers, pool, **kwargs)
77 pool = MultiprocessingPoolExecutor(pool)
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
~/.local/lib/python3.8/site-packages/dask/ in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
512 _execute_task(task, data) # Re-execute locally
513 else:
--> 514 raise_exception(exc, tb)
515 res, worker_id = loads(res_info)
516 state["cache"][key] = res
~/.local/lib/python3.8/site-packages/dask/ in reraise(exc, tb)
323 if exc.__traceback__ is not tb:
324 raise exc.with_traceback(tb)
--> 325 raise exc
~/.local/lib/python3.8/site-packages/dask/ in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
221 try:
222 task, data = loads(task_info)
--> 223 result = _execute_task(task, data)
224 id = get_id()
225 result = dumps((result, id))
~/.local/lib/python3.8/site-packages/dask/ in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.local/lib/python3.8/site-packages/dask/ in __call__(self, *args)
964 if not len(args) == len(self.inkeys):
965 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 966 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
968 def __reduce__(self):
~/.local/lib/python3.8/site-packages/dask/ in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/.local/lib/python3.8/site-packages/dask/ in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.local/lib/python3.8/site-packages/dask/dataframe/io/ in __call__(self, part)
117 # Call `pandas_read_text`
--> 118 df = pandas_read_text(
119 self.reader,
120 block,
~/.local/lib/python3.8/site-packages/dask/dataframe/io/ in pandas_read_text(reader, b, header, kwargs, dtypes, columns, write_header, enforce, path)
169 bio.write(b)
--> 171 df = reader(bio, **kwargs)
172 if dtypes:
173 coerce_dtypes(df, dtypes)
~/.local/lib/python3.8/site-packages/pandas/io/ in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
608 kwds.update(kwds_defaults)
--> 610 return _read(filepath_or_buffer, kwds)
~/.local/lib/python3.8/site-packages/pandas/io/ in _read(filepath_or_buffer, kwds)
467 with parser:
--> 468 return
~/.local/lib/python3.8/site-packages/pandas/io/ in read(self, nrows)
1055 def read(self, nrows=None):
1056 nrows = validate_integer("nrows", nrows)
-> 1057 index, columns, col_dict =
1059 if index is None:
~/.local/lib/python3.8/site-packages/pandas/io/ in read(self, nrows)
2059 def read(self, nrows=None):
2060 try:
-> 2061 data =
2062 except StopIteration:
2063 if self._first_chunk:
pandas/_libs/parsers.pyx in
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_low_memory()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_rows()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_column_data()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_tokens()
ValueError: invalid literal for int() with base 10: 'Dst Port'
Upvotes: 0
Views: 1158
Reputation: 19348
The 'Dst Port'
contains values that cannot be cast as integers. Let's illustrate with a MCVE.
Suppose you have the following people.csv
Try to read this into a DataFrame with the age
column typed as an int64
import dask.dataframe as dd
ddf = dd.read_csv("people.csv", dtype={"first_name": "object", "age": "int64"})
This doesn't work and returns this error message: ValueError: invalid literal for int() with base 10: 'hi'
The string "hi" is preventing age
from being typed as an int64
Your error is ValueError: invalid literal for int() with base 10: 'Dst Port'
. Dask can't coerce your header row. Looks like you have headers in your data and have incorrectly set header=0
in read_csv
Upvotes: 1
Reputation: 16581
Can you try this:
df = dd.read_csv(list_of_csvs, usecols=dtypes.keys(), header=0).astype(dtypes)
The idea here is to apply dtypes after the dataframe was initialized, but before it is saved (I'm not sure it will work in your case, but without a reproducible example...).
Upvotes: 0