Ray Bell
Ray Bell

Reputation: 1618

dask read parquet and specify schema

Is there a dask equivalent of spark's ability to specify a schema when reading in a parquet file? Possibly using kwargs passed to pyarrow?

I have a bunch of parquet files in a bucket but some of the fields have slightly inconsistent names. I could create a custom delayed function to handle these cases after reading them but I'm hoping I could specify the schema when opening them via globing. Maybe not though as I guess opening then via globing is going to try and concatenate them. This currently fails because of the inconsistent field names.

Create a parquet file:

import dask.dataframe as dd

df = dd.demo.make_timeseries(
    start="2000-01-01",
    end="2000-01-03",
    dtypes={"id": int, "z": int},
    freq="1h",
    partition_freq="24h",
)

df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)

Read it in via dask and specify the schema after reading:

df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})

Read it in via spark and specify the schema:

from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()

schema = T.StructType(
    [
        T.StructField("id", T.IntegerType()),
        T.StructField("a", T.FloatType()),
        T.StructField("timestamp", T.TimestampType()),
    ]
)

df = spark.read.format("parquet").schema(schema).load("df.parquet")

Upvotes: 6

Views: 2780

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16561

Some of the options are:

  1. Specify dtypes after loading (requires consistent column names):
custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)

This currently fails because of the inconsistent field names.

  1. If the column names are not the same across files, you might want to use a custom delayed before loading:
@delayed
def custom_load(path):
   df = pd.read_parquet(path)
   # some logic to ensure consistent columns
   # for example:
   if "z" in df.columns:
      df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
   return df

dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])

Upvotes: 5

Related Questions