Sotos
Sotos

Reputation: 51592

Apply function per group in pyspark -pandas_udf (No module named pyarrow)

I am trying to apply a function to each group of a dataset in pyspark. The first error I was getting was

Py4JError: An error occurred while calling o62.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

To solve the above, I removed the spark function (I had spark.range()). Now the error is solved but I now get the following:

File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/serializers.py", line 276, in load_stream
    import pyarrow as pa
ImportError: No module named pyarrow 

But when I try it on its own it works, ie.

df = pd.DataFrame({"a": [1, 2, 3]})
pa.Table.from_pandas(df)
pyarrow.Table
a: int64
__index_level_0__: int64
metadata
--------
{'pandas': '{"pandas_version": "0.23.0", "index_columns": ["__index_level_0__"], "columns": [{"metadata": null, "field_name": "a", "name": "a", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "__index_level_0__", "name": null, "numpy_type": "int64", "pandas_type": "int64"}], "column_indexes": [{"metadata": null, "field_name": null, "name": null, "numpy_type": "object", "pandas_type": "bytes"}]}'}

EXAMPLE THAT FAILS - Taken from here

import pyspark.sql.functions as F
import pandas as pd

cols = ['id', 'val']
vals = [('A', 5), ('A', 3), ('A', 7), ('B', 12), ('B', 15), ('C', 3)]
d1 = sqlContext.createDataFrame(vals, cols)

>>> @F.pandas_udf(d1.schema, F.PandasUDFType.GROUPED_MAP)
... def subtract_mean(pdf):
...     return pdf.assign(v=pdf.v - pdf.v.mean())
...
>>> try1 = d1.groupby('id').apply(subtract_mean)
>>> try1.show()

If I try to convert to pandas dataframe, it does not work because It does not have the schema attribute.

What am I missing here?

Upvotes: 4

Views: 5971

Answers (2)

Mehdi LAMRANI
Mehdi LAMRANI

Reputation: 11597

Adding to the accepted answer :

One issue can be user delegation: ie

  • python module installed with userx
  • spark-submit launched by userx
  • but spark is the technical user executing logic on the workers.
    (behind the scenes, without you noticing).

It might be that sparkis not seeing the module.

To fix that :
su as spark, and install the missing module via pipinstall (on all workers)

That fixed it for me.

Edit :
depending on your settings pyspark shell might still be able to see your module though.

Upvotes: 0

user10006052
user10006052

Reputation: 76

pyarrow has to be present on the path on each worker node.

  • If you run this code on as single node, make sure that PYSPARK_PYTHON (and optionally its PYTHONPATH) are the same as the interpreter you use to test pyarrow code.
  • If you use cluster, make sure that pyarrow is installed on each node, additionally to points made above.

Moreover make sure that installed pyarrow version is greater or equal than minimum supported (0.8 today) - although this should cause different exception.

Upvotes: 6

Related Questions