Apostolos
Apostolos

Reputation: 8101

dask dataframe read parquet schema difference

I do the following:

import dask.dataframe as dd
from dask.distributed import Client
client = Client()

raw_data_df = dd.read_csv('dataset/nyctaxi/nyctaxi/*.csv', assume_missing=True, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

The dataset is taken out of a presentation Mathew Rocklin has made and was used as a dask dataframe demo. Then I try to write it to parquet using pyarrow

raw_data_df.to_parquet(path='dataset/parquet/2015.parquet/') # only pyarrow is installed

Trying to read back:

raw_data_df = dd.read_parquet(path='dataset/parquet/2015.parquet/')

I get the following error:

ValueError: Schema in dataset/parquet/2015.parquet//part.192.parquet was different. 

VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: int64
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}
vs

VendorID: double
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
pickup_longitude: double
pickup_latitude: double
RateCodeID: double
store_and_fwd_flag: binary
dropoff_longitude: double
dropoff_latitude: double
payment_type: double
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
metadata
--------
{'pandas': '{"pandas_version": "0.22.0", "index_columns": [], "columns": [{"metadata": null, "field_name": "VendorID", "name": "VendorID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tpep_pickup_datetime", "name": "tpep_pickup_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "tpep_dropoff_datetime", "name": "tpep_dropoff_datetime", "numpy_type": "datetime64[ns]", "pandas_type": "datetime"}, {"metadata": null, "field_name": "passenger_count", "name": "passenger_count", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "trip_distance", "name": "trip_distance", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_longitude", "name": "pickup_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "pickup_latitude", "name": "pickup_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "RateCodeID", "name": "RateCodeID", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "store_and_fwd_flag", "name": "store_and_fwd_flag", "numpy_type": "object", "pandas_type": "bytes"}, {"metadata": null, "field_name": "dropoff_longitude", "name": "dropoff_longitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "dropoff_latitude", "name": "dropoff_latitude", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "payment_type", "name": "payment_type", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "fare_amount", "name": "fare_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "extra", "name": "extra", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "mta_tax", "name": "mta_tax", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tip_amount", "name": "tip_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "tolls_amount", "name": "tolls_amount", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "improvement_surcharge", "name": "improvement_surcharge", "numpy_type": "float64", "pandas_type": "float64"}, {"metadata": null, "field_name": "total_amount", "name": "total_amount", "numpy_type": "float64", "pandas_type": "float64"}], "column_indexes": []}'}

But looking them they look identical. Any help identifying the reason?

Upvotes: 2

Views: 7919

Answers (2)

tpegbert
tpegbert

Reputation: 186

This question gets at one of the nastier problems in Pandas and Dask, i.e., the nullability, or lack thereof, of data types. Thus, missing data can cause problems, especially with data types, such as integers, for which there is no missing data designation.

Floats and datetimes are not too bad, because they have designated null, or missing value, place holders (NaN for floating point values in numpy and NaT for datetimes in pandas) and are therefore nullable. But even those dtypes have problems in some circumstances.

The problem can arise when you read multiple CSV files (as in your case), or pull from a database, or merge a small data frame into a larger one. You can end up with partitions in which some or all values for a given field are missing. For those partitions, Dask and also Pandas will assign a dtype for the field that can accomodate the missing data indicator. In the case of integers, the new dtype will be float. That gets further transformed to double when writing to parquet.

Dask will happily list a somewhat misleading dtype for the field. But when you write to parquet, the partitions with missing data get written as something else. As in your case, the "int64" got written as "double" in at least one parquet file. Then, when you attempted to read the entire Dask dataframe, you got the ValueError as you've shown, above, because of the mismatch.

Until these problems can be resolved, you need to make sure all of your Dask fields have appropriate data in every row. For example, if you have an int64 field, then NaN values or some other non-integer representation of missing values are not going to work.

Your int64 field may have to be fixed in several steps:

  1. Import Pandas:

    import pandas as pd
    
  2. Clean up the field data to float64 and Coerce missing values to NaN:

    df['myint64'] = df['myint64'].map_partitions(
        pd.to_numeric,
        meta='f8',
        errors='coerce'
    )
    
  3. Select a sentinal value (e.g., -1.0) to substitute for NaN so that int64 will work:

    df['myint64'] = df['myint64'].where(
        ~df['myint64'].isna(),
        -1.0
    )
    
  4. Cast your field to int64 and persist it all:

    df['myint64'] = df['myint64'].astype('i8')
    df = client.persist(df)
    
  5. Then try the save and reread round trip.

Note: steps 1-2 are useful for fixing float64 fields.

Finally, to fix a datetime field, try this:

    df['mydateime'] = df['mydateime'].map_partitions(
        pd.to_datetime,
        meta='M8',
        infer_datetime_format=True, 
        errors='coerce'
    ).persist()

Upvotes: 4

mdurant
mdurant

Reputation: 28683

The following two numpy specs disagree

{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'int64', 'pandas_type': 'int64'}

RateCodeID: int64 


{'metadata': None, 'field_name': 'RateCodeID', 'name': 'RateCodeID', 'numpy_type': 'float64', 'pandas_type': 'float64'}

RateCodeID: double

(look carefully!)

I suggest you supply dtypes for this columns upon loading, or use astype to coerce them to floats before writing.

Upvotes: 4

Related Questions