Reputation: 1618
Is there a dask equivalent of spark's ability to specify a schema when reading in a parquet file? Possibly using kwargs passed to pyarrow?
I have a bunch of parquet files in a bucket but some of the fields have slightly inconsistent names. I could create a custom delayed function to handle these cases after reading them but I'm hoping I could specify the schema when opening them via globing. Maybe not though as I guess opening then via globing is going to try and concatenate them. This currently fails because of the inconsistent field names.
Create a parquet file:
import dask.dataframe as dd
df = dd.demo.make_timeseries(
start="2000-01-01",
end="2000-01-03",
dtypes={"id": int, "z": int},
freq="1h",
partition_freq="24h",
)
df.to_parquet("df.parquet", engine="pyarrow", overwrite=True)
Read it in via dask and specify the schema after reading:
df = dd.read_parquet("df.parquet", engine="pyarrow")
df["z"] = df["z"].astype("float")
df = df.rename(columns={"z": "a"})
Read it in via spark and specify the schema:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
spark = SparkSession.builder.appName('App').getOrCreate()
schema = T.StructType(
[
T.StructField("id", T.IntegerType()),
T.StructField("a", T.FloatType()),
T.StructField("timestamp", T.TimestampType()),
]
)
df = spark.read.format("parquet").schema(schema).load("df.parquet")
Upvotes: 6
Views: 2780
Reputation: 16561
Some of the options are:
custom_dtypes = {"a": float, "id": int, "timestamp": pd.datetime}
df = dd.read_parquet("df.parquet", engine="pyarrow").astype(custom_dtypes)
This currently fails because of the inconsistent field names.
delayed
before loading:@delayed
def custom_load(path):
df = pd.read_parquet(path)
# some logic to ensure consistent columns
# for example:
if "z" in df.columns:
df = df.rename(columns={"z": "a"}).astype(custom_dtypes)
return df
dask_df = dd.from_delayed([custom_load(path) for path in glob.glob("some_path/*parquet")])
Upvotes: 5