Rae
Rae

Reputation: 21

PySpark - use datetime object with a PandasUDFType.GROUPED_MAP

I have created a PandasUDF to return the most recent 'count' for each ID. The 'date' column in the spark DF is a string type(YYYY-mm-dd). In the function below I use pd.to_datetime to convert the string to a datetype to get the max(date) for each ID. The function(below) works just fine when applied to a pandas dataframe. But when I try to use it in spark I get the following error.

AttributeError("Can only use .dt accessor with datetimelike " "values")

I've tried to first cast the date column to a DateType(), but the error stays the same.

@pandas_udf("id string, count int", PandasUDFType.GROUPED_MAP)
def recent_date(pdf):
    pdf['date'] = pd.to_datetime(pdf.date)
    latest_data = (pdf[pdf['date'] == max(pdf['date'])]).copy()
    return latest_data[['id', 'count']]

I am calling the function using the following call:

df.groupby('id').apply(recent_date)

Any help would be really appreciated. Thanks.

Upvotes: 0

Views: 1532

Answers (1)

Dd__Mad
Dd__Mad

Reputation: 116

According to this answer and checking supported types, current pandas_udf does not support date type with grouped map UDF (but it's weird that I can use grouped aggregate UDF with date type somehow, not sure if it's because it did not encounter any type-checking logic in my situation).

What I did is just casting the date type column (in your case: 'date') to timestamp type and then it worked for me.

df.withColumn('date', unix_timestamp(col('date'), "yyyy-MM-dd").cast("timestamp")) \
    .groupBy('id').apply(recent_date)

Hope this could help.

Upvotes: 1

Related Questions