Reputation: 2609
I am a bit new to pyspark and python. I am trying to run ML function as pyspark UDF.
Here is an example:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
df = spark.createDataFrame(['Bob has a dog. He loves him'], StringType())
def parse(text):
import spacy
import neuralcoref
nlp = spacy.load('en_core_web_sm')
# Let's try before using the conversion dictionary:
neuralcoref.add_to_pipe(nlp)
doc = nlp(text)
return doc._.coref_resolved
pd_udf = pandas_udf(parse, returnType=StringType())
df.select(pd_udf(col("value"))).show()
Getting this error:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
return lambda *a: (verify_result_length(*a), arrow_return_type)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in verify_result_length
result = f(*a)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<stdin>", line 7, in parse
File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 377, in __call__
doc = self.make_doc(text)
File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 401, in make_doc
return self.tokenizer(text)
TypeError: Argument 'string' has incorrect type (expected str, got Series)
Is this possible to run this code on Pyspark?
Upvotes: 5
Views: 1047
Reputation: 5032
Another way would be to use Pandas UDF if you are more familiar with Pandas in general -
from pyspark import SparkContext
from pyspark.sql import SQLContext
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,StructType,StructField,FloatType,ArrayType,IntegerType,DateType
from functools import partial
from pyspark.sql.functions import lit,array,pandas_udf,PandasUDFType
import pandas as pd
import spacy
import neuralcoref
#### Broadcast the load and makes it available across the worker nodes
nlp = sc.broadcast(spacy.load('en_core_web_sm'))
def udf_parse(text,input_col='value'):
neuralcoref.add_to_pipe(nlp)
doc = nlp(text.loc[:,input_col])
text['parsed_text'] = doc._.coref_resolved
return text
sc = SparkContext.getOrCreate()
sql = SQLContext(sc)
sparkDF = sql.createDataFrame(['Bob has a dog. He loves him'], StringType())
schema = StructType([
StructField('value', StringType(), True),
StructField('parsed_value', StringType(), True)
])
partial_func = partial(udf_parse,input_col='value')
sparkDF_agg = sparkDF.groupby().applyInPandas(partial_func,schema)
Upvotes: 1
Reputation: 169
Hi so I had a whole bunch of issues setting up spacy and neuralcoref so I replaced the nlp function with a random function that reverses the string.
But basically what's happening here is when you pass col("value") into pd_udf it is a pd.Series type. So in your parse function you should define it to accept that type like so:
def nlp(text):
return text[::-1]
@pandas_udf("string")
def parse(text: pd.Series) -> pd.Series:
text = text.apply(nlp)
return text
pd_udf = pandas_udf(parse, returnType=StringType())
df = spark.createDataFrame([("Bob has a dog. He loves him",),("dog jumps over the fox",)], ("my_text",))
df.select(parse("my_text")).show()
This gives the following result:
+--------------------+
| parse(my_text)|
+--------------------+
|mih sevol eH .god...|
|xof eht revo spmu...|
+--------------------+
Upvotes: 1