Reputation: 51592
I am trying to apply a function to each group of a dataset in pyspark. The first error I was getting was
Py4JError: An error occurred while calling o62.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist
To solve the above, I removed the spark function (I had spark.range()
). Now the error is solved but I now get the following:
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/pyspark/serializers.py", line 276, in load_stream import pyarrow as pa ImportError: No module named pyarrow
But when I try it on its own it works, ie.
df = pd.DataFrame({"a": [1, 2, 3]})
pa.Table.from_pandas(df)
pyarrow.Table
a: int64
__index_level_0__: int64
metadata
--------
{'pandas': '{"pandas_version": "0.23.0", "index_columns": ["__index_level_0__"], "columns": [{"metadata": null, "field_name": "a", "name": "a", "numpy_type": "int64", "pandas_type": "int64"}, {"metadata": null, "field_name": "__index_level_0__", "name": null, "numpy_type": "int64", "pandas_type": "int64"}], "column_indexes": [{"metadata": null, "field_name": null, "name": null, "numpy_type": "object", "pandas_type": "bytes"}]}'}
EXAMPLE THAT FAILS - Taken from here
import pyspark.sql.functions as F
import pandas as pd
cols = ['id', 'val']
vals = [('A', 5), ('A', 3), ('A', 7), ('B', 12), ('B', 15), ('C', 3)]
d1 = sqlContext.createDataFrame(vals, cols)
>>> @F.pandas_udf(d1.schema, F.PandasUDFType.GROUPED_MAP)
... def subtract_mean(pdf):
... return pdf.assign(v=pdf.v - pdf.v.mean())
...
>>> try1 = d1.groupby('id').apply(subtract_mean)
>>> try1.show()
If I try to convert to pandas dataframe, it does not work because It does not have the schema
attribute.
What am I missing here?
Upvotes: 4
Views: 5971
Reputation: 11597
Adding to the accepted answer :
One issue can be user delegation: ie
userx
userx
spark
is the technical user executing logic on the workers.It might be that spark
is not seeing the module.
To fix that :
su
as spark
, and install the missing module via pip
install
(on all workers)
That fixed it for me.
Edit :
depending on your settings pyspark
shell might still be able to see your module though.
Upvotes: 0
Reputation: 76
pyarrow
has to be present on the path on each worker node.
PYSPARK_PYTHON
(and optionally its PYTHONPATH
) are the same as the interpreter you use to test pyarrow
code.pyarrow
is installed on each node, additionally to points made above.Moreover make sure that installed pyarrow
version is greater or equal than minimum supported (0.8 today) - although this should cause different exception.
Upvotes: 6