Reputation: 1545
I am using AWS EMR (5.29) to run a pyspark job but receiving this error when I apply
a pandas udf.
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array
Here's the dummy code to replicate the issue.
import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
(1, "A", "X1"),
(2, "B", "X2"),
(3, "B", "X3"),
(1, "B", "X3"),
(2, "C", "X2"),
(3, "C", "X2"),
(1, "C", "X1"),
(1, "B", "X1"),
], ["id", "type", "code"])
and here's the dummy udf
schema = StructType([
StructField("code", StringType()),
])
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def dummy_udaf(pdf):
pdf = pdf[['code']]
return pdf
and when I run this line,
df.groupby('type').apply(dummy_udaf).show()
I am receiving this error:
An error occurred while calling o149.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 12.0 failed 4 times, most recent failure: Lost task 64.3 in stage 12.0 (TID 66, ip-10-161-108-245.vpc.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 254, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array
Tried to use downgraded version of arrow as suggested here and also disabled pyarrow optimization as suggested here, but nothing worked.
Upvotes: 4
Views: 3383
Reputation: 56
I had the same problem as you (on AWS EMR) and was able to resolve it by installing pyarrow==0.14.1
. I don't know exactly why it didn't work for you, but one guess is that you needed to perform this installation in a bootstrap script so that it occurred on all machines in your cluster. It won't be enough to just set the environment variable in the notebook you are working in. Hope this helps you!
Upvotes: 4