Reputation: 1150
I am writing a spark dataframe to a bigquery table. This was working, but now I call a pandas udf before writing the data to bigquery. For some reason, when I call the pandas udf before writing the spark dataframe to bigquery I now see the following error:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)
Which, from the executor logs below, looks like its being caused by an incorrect parquet schema where the timestamp columns are being inferred as integers?
20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "status",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "entry_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
}, {
"name" : "last_status_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional binary id (UTF8);
optional binary firstname (UTF8);
optional binary status (UTF8);
optional int96 entry_date;
optional int96 last_status_date;
}
This is confusing because this does not happen when I run my code without applying the pandas_udf. The udf is not manipulating the date columns in any way...
def main():
# apply pandas udf
df.groupBy('firstname').apply(my_pandas_udf)
# drop some columns
cols_to_drop = ['firstname']
# save to bigquery
df \
.drop(*cols_to_drop) \
.write \
.format("bigquery") \
.option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
.option("project", "PROJECT_ID") \
.option("credentialsFile","/path/to/my/credentials.json") \
.option("parentProject", "PROJECT_ID") \
.option("table", "PROJECT_ID:dataset.table") \
.mode("overwrite") \
.save()
def udf_schema():
return StructType([
StructField('id', StringType(), True),
StructField('firstname', StringType(), True),
StructField('status', StringType(), True),
StructField('entry_date', TimestampType(), True),
StructField('last_status_date', TimestampType(), True),
])
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
What am I doing wrong? This stackoverflow post seems to have a similar issue but as of 1/21/2020 has not been answered.
Edit (1): Dataframe datatypes before & after pandas_udf The error occurs when returning from the pandas_udf, but here are the datatypes for the spark dataframe before its passed to the pandas_udf
==> BEFORE
id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp
Upvotes: 5
Views: 10122
Reputation: 56
I ran into a similar problem recently, and I believe the error occurs due to pandas casting every column to object
when reading from spark.
The way I fixed my issue was to explicitly cast the timestamp columns after the pandas dataframe was created. So in your case, something like:
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df['entry_date'] = pd.to_datetime(df['entry_date'])
df['last_status_date'] = pd.to_datetime(df['last_status_date'])
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
Upvotes: 4