John Doe
John Doe

Reputation: 10203

Pandas UDF (PySpark) - Incorrect type Error

I'm trying entity extraction with spaCy and Pandas UDF (PySpark) but I get an error.
Using a UDF works without errors but is slow. What am I doing wrong?

Loading the model every time is to avoid load error - Can't find model 'en_core_web_lg'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

Working UDF:

def __get_entities(x):

    global nlp
    nlp = spacy.load("en_core_web_lg")
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
            ents.append(ent.label_)

    return ents

get_entities_udf = F.udf(__get_entities), T.ArrayType(T.StringType()))

Pandas UDF with error:

def __get_entities(x):

    global nlp
    nlp = spacy.load("en_core_web_lg")
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
            ents.append(ent.label_)

    return pd.Series(ents)


get_entities_udf = F.pandas_udf(lambda x: __get_entities(x), "array<string>", F.PandasUDFType.SCALAR)

Error message:

TypeError: Argument 'string'has incorrect type (expected str, got series)

Sample Spark DataFrame:

df = spark.createDataFrame([
  ['John Doe'],
  ['Jane Doe'],
  ['Microsoft Corporation'],
  ['Apple Inc.'],
]).toDF("name",)

New column:

df_new = df.withColumn('entity',get_entities_udf('name'))

Upvotes: 0

Views: 1330

Answers (2)

codekoriko
codekoriko

Reputation: 880

I'm using: pyspark 3.1.1 and python 3.7

The answer above didn't work for me, I and spend quite some time making things work, so I thought I'd share the solution I came up with.

Setting things up

creating a sample of 16 random person and company names

import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from faker import Faker
import spacy

spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()

names = []
fake = Faker()
for _ in range(8):
    names.append(f"{fake.company()} {fake.company_suffix()}")
    names.append(fake.name())

df = spark.createDataFrame(names, StringType())

As it is

First, checking the current solution proposed. I'm just Adding a print statement upon loading the spacy model to see how many time we do load the model.

# printing a msg each time we load the model
def load_spacy_model():
    print("Loading spacy model...")
    return spacy.load("en_core_web_sm")

def entities(x):
    global nlp
    import spacy
    nlp = load_spacy_model()
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
            ents.append(ent.label_)
    return ents


def __get_entities(x):
    return x.apply(entities)

get_entities_udf = pandas_udf(lambda x: __get_entities(x), "array<string>", PandasUDFType.SCALAR)

df_new = df.withColumn('entity',get_entities_udf('value'))

df_new.show()

We can then see that the model is loaded 16 times, so one for every single entry we process. Not what I want.

Batch processing

Rewriting using the decorator introduce in spark 3.0+ that is using Type Hints (python 3.6+). Then our UDF is using the nlp.pipe() for batch processing the entire pd.Series

# printing a msg each time we load the model
def load_spacy_model():
    print("Loading spacy model...")
    return spacy.load("en_core_web_sm")

# decorator indicating that this function is pandas_udf
# and that it's gonna process list of string
@pandas_udf(ArrayType(StringType()))
# function receiving a pd.Series and returning a pd.Series
def entities(list_of_text: pd.Series) -> pd.Series:
    global nlp
    nlp = load_spacy_model()
    docs = nlp.pipe(list_of_text)

    # retrieving the str representation of entity label
    # as we are limited in the types of obj
    # we can return from a panda_udf
    # we couldn't return a Span obj for example
    ents=[
        [ent.label_ for ent in doc.ents]
        for doc in docs
    ]
    return pd.Series(ents)


df_new = df.withColumn('entity',entities('value'))

df_new.show()

In my case the model was loaded 4 times, that's better. It's each time a python worker is created to process a batch. So the number will depend how many cores is Spark using but more critically in my case: how much partitioned is our data. So it's yet to be optimum

broadcasting the nlp object

# printing a msg each time we load the model
def load_spacy_model():
    print("Loading spacy model...")
    return spacy.load("en_core_web_sm")

@pandas_udf(ArrayType(StringType()))
def entities(list_of_text: pd.Series) -> pd.Series:
    nlp = boardcasted_nlp.value
    docs = nlp.pipe(list_of_text)

    # retrieving the str representation of entity label
    # as we are limited in the types of obj
    # we can return from a panda_udf
    # we couldn't return a Span obj for example
    ents=[
        [ent.label_ for ent in doc.ents]
        for doc in docs
    ]
    return pd.Series(ents)

boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())

df_new = df.withColumn('entity',entities('value'))

df_new.show()

Now the model is loaded only once then broadcasted to every python worker that is getting spawned.

The complete Code

import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import pandas_udf, PandasUDFType

from faker import Faker
import spacy

spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()

# creating our set of fake person and company names
names = []
fake = Faker()
for _ in range(8):
    names.append(f"{fake.company()} {fake.company_suffix()}")
    names.append(fake.name())

df = spark.createDataFrame(names, StringType())

# printing a msg each time we load the model
def load_spacy_model():
    print("Loading spacy model...")
    return spacy.load("en_core_web_sm")

# decorator indicating that this function is pandas_udf
# and that it's gonna process list of string
@pandas_udf(ArrayType(StringType()))
# function receiving a pd.Series and returning a pd.Series
def entities(list_of_text: pd.Series) -> pd.Series:
    # retrieving the shared nlp object
    nlp = boardcasted_nlp.value
    # batch processing our list of text
    docs = nlp.pipe(list_of_text)
    
    # retrieving the str representation of entity label
    # as we are limited in the types of obj
    # we can return from a panda_udf
    # we couldn't return a Span obj for example
    ents=[
        [ent.label_ for ent in doc.ents]
        for doc in docs
    ]
    return pd.Series(ents)

# we load the spacy model and broadcast it
boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())

df_new = df.withColumn('entity',entities('value'))

df_new.show(truncate=False)

Result

+----------------------------------+--------------------------------+
|value                             |entity                          |
+----------------------------------+--------------------------------+
|Ferguson, Price and Green Ltd     |[ORG, ORG, ORG]                 |
|Cassandra Goodman MD              |[PERSON]                        |
|Solis Ltd LLC                     |[ORG]                           |
|Laurie Foster                     |[PERSON]                        |
|Lane-Vasquez Group                |[ORG]                           |
|Matthew Wright                    |[PERSON]                        |
|Scott, Pugh and Rodriguez and Sons|[PERSON, PERSON, PERSON, PERSON]|
|Tina Cooke                        |[PERSON]                        |
|Watkins, Blake and Foster Ltd     |[ORG]                           |
|Charles Reyes                     |[PERSON]                        |
|Cooper, Norris and Roberts PLC    |[ORG]                           |
|Michael Tate                      |[PERSON]                        |
|Powell, Lawson and Perez and Sons |[PERSON, PERSON, PERSON, PERSON]|
|James Wolf PhD                    |[PERSON]                        |
|Greer-Swanson PLC                 |[ORG]                           |
|Nicholas Hale                     |[PERSON]                        |
+----------------------------------+--------------------------------+

Upvotes: 1

tourist
tourist

Reputation: 4333

You need to see the input as pd.Series instead of single value

I was able to get it working by refactoring the code a bit. Notice x.apply call which is pandas specific and applies function to a pd.Series.

def entities(x):
    global nlp
    import spacy
    nlp = spacy.load("en_core_web_lg")
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
            ents.append(ent.label_)
    return ents


def __get_entities(x):
    return x.apply(entities)

get_entities_udf = pandas_udf(lambda x: __get_entities(x), "array<string>", PandasUDFType.SCALAR)

df_new = df.withColumn('entity',get_entities_udf('name'))

df_new.show()

+--------------------+--------+
|                name|  entity|
+--------------------+--------+
|            John Doe|[PERSON]|
|            Jane Doe|[PERSON]|
|Microsoft Corpora...|   [ORG]|
|          Apple Inc.|   [ORG]|
+--------------------+--------+

Upvotes: 1

Related Questions