n1tk
n1tk

Reputation: 2490

PySpark UDF to Pandas UDF for sting columns

I do have an UDF that is slow for large dataset and I try to improve execution time and scalability by leveraging pandas_udfs and all searching and official documentation does more focus to scalar and a mapping approach that I already used but I do fail to extend to series or pandas dataframe approach, can u point me to right direction ?

I do want to do in parallel and current UDF approach is very slow since is doing sequentially one by one record and other solution I do have is in koalas but I rather include it as part of a custom transformer in pyspark pipeline:

below listed UDF approach (working one):

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as F
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def ngrams_udf(string, n=3):
    """Takes an input string, cleans it and converts to ngrams. 
    This script is focussed on cleaning UK company names but can be made generic by removing lines below"""
    string = str(string)
    string = string.lower() # lower case
    string = string.encode("ascii", errors="ignore").decode() #remove non ascii chars
    chars_to_remove = [")","(",".","|","[","]","{","}","'","-"]
    rx = '[' + re.escape(''.join(chars_to_remove)) + ']' #remove punc, brackets etc...
    string = re.sub(rx, '', string)
    string = string.replace('&', 'and')
    string = string.replace('limited', 'ltd')
    string = string.replace('public limited company', 'plc')
    string = string.replace('united states of america', 'usa')
    string = string.replace('community interest company', 'cic')
    string = string.title() # normalise case - capital at start of each word
    string = re.sub(' +',' ',string).strip() # get rid of multiple spaces and replace with a single
    string = ' '+ string +' ' # pad names for ngrams...
    ngrams = zip(*[string[i:] for i in range(n)])
    return [''.join(ngram) for ngram in ngrams]
    
    # # register UDF
    dummy_ngram_udf = udf(ngrams_udf, ArrayType(StringType()))
    
    # # call udf on string column and returns array type. 
    df.withColumn(out_col, dummy_ngram_udf(col(in_col)))

I tried with following but does not map to series the input and output ... so the input vector and output vector has different sizes ...:

from pandas import Series
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct

@pandas_udf("string")
def ngrams_udf(string: pd.Series , n=3) -> pd.Series:
    """Takes an input string, cleans it and converts to ngrams. 
    This script is focussed on cleaning UK company names but can be made generic by removing lines below"""
    string = str(string)
    string = string.lower() # lower case
    string = string.encode("ascii", errors="ignore").decode() #remove non ascii chars
    chars_to_remove = [")","(",".","|","[","]","{","}","'","-"]
    rx = '[' + re.escape(''.join(chars_to_remove)) + ']' #remove punc, brackets etc...
    string = re.sub(rx, '', string)
    string = string.replace('&', 'and')
    string = string.replace('limited', 'ltd')
    string = string.replace('public limited company', 'plc')
    string = string.replace('united states of america', 'usa')
    string = string.replace('community interest company', 'cic')
    string = string.title() # normalise case - capital at start of each word
    string = re.sub(' +',' ',string).strip() # get rid of multiple spaces and replace with a single
    string = ' '+ string +' ' # pad names for ngrams...
    ngrams = zip(*[string[i:] for i in range(n)])
    return [''.join(ngram) for ngram in ngrams]

Upvotes: 1

Views: 2168

Answers (1)

blackbishop
blackbishop

Reputation: 32640

In general, try to avoid using python UDFs when you can actually do the same thing using only Spark builtin functions. Although pandas_udf brings better performances, using native spark functions should "always" perform faster.

Now for your question, the pandas_udf takes a pd.Series so you need to adapt the code as your variable string is no longer a single string but a Series.

from pyspark.sql import functions as F

@F.pandas_udf(ArrayType(StringType()))
def ngrams_udf(string: pd.Series, n: pd.Series) -> pd.Series:
   """Takes an input string, cleans it and converts to ngrams.
   This script is focussed on cleaning UK company names but can be made generic by removing lines below"""

   n = n.iloc[0]
   string.str.lower()  # lower case
   string.str.encode("ascii", errors="ignore").str.decode("utf8")  # remove non ascii chars

   chars_to_remove = [")", "(", ".", "|", "[", "]", "{", "}", "'", "-"]
   rx = '[' + re.escape(''.join(chars_to_remove)) + ']'  # remove punc, brackets etc...
   string = string.str.replace(rx, '', regex=True)

   string = string.str.replace('&', 'and')
   string = string.str.replace('limited', 'ltd')
   string = string.str.replace('public limited company', 'plc')
   string = string.str.replace('united states of america', 'usa')
   string = string.str.replace('community interest company', 'cic')
   string = string.str.title()  # normalise case - capital at start of each word

   # get rid of multiple spaces and replace with a single
   string = string.str.replace(r"\s+", '', regex=True).str.strip()

   string = string.str.pad(width=1, side='both')  # pad names for ngrams...

   string = string.apply(lambda x: zip(*[x[i:] for i in range(n)]))
   string = string.apply(lambda x: [''.join(ngram) for ngram in list(x)])

   return string

And using it with:

df.withColumn("ngrams", ngrams_udf(F.col("company"), F.lit(3)))

Upvotes: 2

Related Questions