Vishwa
Vishwa

Reputation: 1545

Pyarrow error: while running a pandas udf in pyspark

I am using AWS EMR (5.29) to run a pyspark job but receiving this error when I apply a pandas udf.

pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

Here's the dummy code to replicate the issue.

import pyspark.sql.functions as F
from pyspark.sql.types import *

df = spark.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["id", "type", "code"])

and here's the dummy udf

schema = StructType([
    StructField("code", StringType()),
])


@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def dummy_udaf(pdf):
    pdf = pdf[['code']]
    return pdf

and when I run this line,

df.groupby('type').apply(dummy_udaf).show()

I am receiving this error:

An error occurred while calling o149.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 12.0 failed 4 times, most recent failure: Lost task 64.3 in stage 12.0 (TID 66, ip-10-161-108-245.vpc.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt2/yarn/usercache/livy/appcache/application_1585015669438_0003/container_1585015669438_0003_01_000253/pyspark.zip/pyspark/serializers.py", line 254, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t, safe=False)
  File "pyarrow/array.pxi", line 755, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Input object was not a NumPy array

Tried to use downgraded version of arrow as suggested here and also disabled pyarrow optimization as suggested here, but nothing worked.

Upvotes: 4

Views: 3383

Answers (1)

dking
dking

Reputation: 56

I had the same problem as you (on AWS EMR) and was able to resolve it by installing pyarrow==0.14.1. I don't know exactly why it didn't work for you, but one guess is that you needed to perform this installation in a bootstrap script so that it occurred on all machines in your cluster. It won't be enough to just set the environment variable in the notebook you are working in. Hope this helps you!

Upvotes: 4

Related Questions