isados
isados

Reputation: 73

Why can't Dask coerce datatypes when trying to save a DataFrame?

I've just started using Dask, and I'm trying to combine different csvs together that have the columns mentioned in dtypes. I included the dtypes dictionary to coerce the data in the csvs to their types. But despite that, when I try to save the Dask Dataframe as a parquet file, I encounter this bug. Oddly enough the Dst Port field is still "int64" when I execute the df.info(), so I'm not sure how to coerce the datatype any other way.

The individual csvs don't have this issue with pandas & dask. Until I combine them, then it just throws this bug. Why is this really happening and how do I fix it?

import dask.dataframe as dd
import pandas as pd
from numpy import dtype
import numpy as np


dtypes = {'Timestamp': dtype('O'),
 'Dst Port': dtype('int64'),
 'Active Max': dtype('float64'),
 'Active Min': dtype('float64'),
 'Idle Mean': dtype('float64'),
 'Idle Std': dtype('float64'),
 'Idle Max': dtype('float64'),
 'Idle Min': dtype('float64'),
 'Label': dtype('O')}

root = "/home/haktrak/Public"
input_files = ["DDosHOIC.csv", "DosSlowandHulk.csv"]
output_file = root + "combined.parq"
list_of_csvs = [f'{root}/{file}' for file in input_files]

df = dd.read_csv(list_of_csvs, dtype=dtypes, usecols=dtypes.keys(), header=0)

df.to_parquet(output_file)

Output & Traceback

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_tokens()

TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-29-e7cebdeea3d9> in <module>
    106 df = dd.read_csv(list_of_csvs, dtype=dtypes, usecols=dtypes.keys(), header=0)
    107 
--> 108 df.to_parquet(output_file)
    109 # root = "csv/"
    110 # df1 = dd.read_csv(root+"TEST_ISCX.csv", dtype=dtypes, usecols=dtypes.keys(), header=0)

~/.local/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   4371         from .io import to_parquet
   4372 
-> 4373         return to_parquet(self, path, *args, **kwargs)
   4374 
   4375     @derived_from(pd.DataFrame)

~/.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
    646         if compute_kwargs is None:
    647             compute_kwargs = dict()
--> 648         out = out.compute(**compute_kwargs)
    649     return out
    650 

~/.local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    283         dask.base.compute
    284         """
--> 285         (result,) = compute(self, traverse=False, **kwargs)
    286         return result
    287 

~/.local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    565         postcomputes.append(x.__dask_postcompute__())
    566 
--> 567     results = schedule(dsk, keys, **kwargs)
    568     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    569 

~/.local/lib/python3.8/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     77             pool = MultiprocessingPoolExecutor(pool)
     78 
---> 79     results = get_async(
     80         pool.submit,
     81         pool._max_workers,

~/.local/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    512                             _execute_task(task, data)  # Re-execute locally
    513                         else:
--> 514                             raise_exception(exc, tb)
    515                     res, worker_id = loads(res_info)
    516                     state["cache"][key] = res

~/.local/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
    323     if exc.__traceback__ is not tb:
    324         raise exc.with_traceback(tb)
--> 325     raise exc
    326 
    327 

~/.local/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/.local/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.local/lib/python3.8/site-packages/dask/optimization.py in __call__(self, *args)
    964         if not len(args) == len(self.inkeys):
    965             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 966         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    967 
    968     def __reduce__(self):

~/.local/lib/python3.8/site-packages/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/.local/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.local/lib/python3.8/site-packages/dask/dataframe/io/csv.py in __call__(self, part)
    116 
    117         # Call `pandas_read_text`
--> 118         df = pandas_read_text(
    119             self.reader,
    120             block,

~/.local/lib/python3.8/site-packages/dask/dataframe/io/csv.py in pandas_read_text(reader, b, header, kwargs, dtypes, columns, write_header, enforce, path)
    169     bio.write(b)
    170     bio.seek(0)
--> 171     df = reader(bio, **kwargs)
    172     if dtypes:
    173         coerce_dtypes(df, dtypes)

~/.local/lib/python3.8/site-packages/pandas/io/parsers.py in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
    608     kwds.update(kwds_defaults)
    609 
--> 610     return _read(filepath_or_buffer, kwds)
    611 
    612 

~/.local/lib/python3.8/site-packages/pandas/io/parsers.py in _read(filepath_or_buffer, kwds)
    466 
    467     with parser:
--> 468         return parser.read(nrows)
    469 
    470 

~/.local/lib/python3.8/site-packages/pandas/io/parsers.py in read(self, nrows)
   1055     def read(self, nrows=None):
   1056         nrows = validate_integer("nrows", nrows)
-> 1057         index, columns, col_dict = self._engine.read(nrows)
   1058 
   1059         if index is None:

~/.local/lib/python3.8/site-packages/pandas/io/parsers.py in read(self, nrows)
   2059     def read(self, nrows=None):
   2060         try:
-> 2061             data = self._reader.read(nrows)
   2062         except StopIteration:
   2063             if self._first_chunk:

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader.read()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_low_memory()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_rows()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_column_data()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_tokens()

ValueError: invalid literal for int() with base 10: 'Dst Port'

Upvotes: 0

Views: 1158

Answers (2)

Powers
Powers

Reputation: 19348

The 'Dst Port' contains values that cannot be cast as integers. Let's illustrate with a MCVE.

Suppose you have the following people.csv file:

first_name,age
bob,34
mary,10
kat,hi

Try to read this into a DataFrame with the age column typed as an int64 column:

import dask.dataframe as dd

ddf = dd.read_csv("people.csv", dtype={"first_name": "object", "age": "int64"})

This doesn't work and returns this error message: ValueError: invalid literal for int() with base 10: 'hi'.

The string "hi" is preventing age from being typed as an int64 column.

Your error is ValueError: invalid literal for int() with base 10: 'Dst Port'. Dask can't coerce your header row. Looks like you have headers in your data and have incorrectly set header=0 in read_csv.

Upvotes: 1

SultanOrazbayev
SultanOrazbayev

Reputation: 16581

Can you try this:

df = dd.read_csv(list_of_csvs, usecols=dtypes.keys(), header=0).astype(dtypes)

df.to_parquet(output_file)

The idea here is to apply dtypes after the dataframe was initialized, but before it is saved (I'm not sure it will work in your case, but without a reproducible example...).

Upvotes: 0

Related Questions