Reputation: 2490
I do have an UDF that is slow for large dataset and I try to improve execution time and scalability by leveraging pandas_udfs and all searching and official documentation does more focus to scalar and a mapping approach that I already used but I do fail to extend to series or pandas dataframe approach, can u point me to right direction ?
I do want to do in parallel and current UDF approach is very slow since is doing sequentially one by one record and other solution I do have is in koalas but I rather include it as part of a custom transformer in pyspark pipeline:
below listed UDF approach (working one):
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as F
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def ngrams_udf(string, n=3):
"""Takes an input string, cleans it and converts to ngrams.
This script is focussed on cleaning UK company names but can be made generic by removing lines below"""
string = str(string)
string = string.lower() # lower case
string = string.encode("ascii", errors="ignore").decode() #remove non ascii chars
chars_to_remove = [")","(",".","|","[","]","{","}","'","-"]
rx = '[' + re.escape(''.join(chars_to_remove)) + ']' #remove punc, brackets etc...
string = re.sub(rx, '', string)
string = string.replace('&', 'and')
string = string.replace('limited', 'ltd')
string = string.replace('public limited company', 'plc')
string = string.replace('united states of america', 'usa')
string = string.replace('community interest company', 'cic')
string = string.title() # normalise case - capital at start of each word
string = re.sub(' +',' ',string).strip() # get rid of multiple spaces and replace with a single
string = ' '+ string +' ' # pad names for ngrams...
ngrams = zip(*[string[i:] for i in range(n)])
return [''.join(ngram) for ngram in ngrams]
# # register UDF
dummy_ngram_udf = udf(ngrams_udf, ArrayType(StringType()))
# # call udf on string column and returns array type.
df.withColumn(out_col, dummy_ngram_udf(col(in_col)))
I tried with following but does not map to series the input and output ... so the input vector and output vector has different sizes ...:
from pandas import Series
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
@pandas_udf("string")
def ngrams_udf(string: pd.Series , n=3) -> pd.Series:
"""Takes an input string, cleans it and converts to ngrams.
This script is focussed on cleaning UK company names but can be made generic by removing lines below"""
string = str(string)
string = string.lower() # lower case
string = string.encode("ascii", errors="ignore").decode() #remove non ascii chars
chars_to_remove = [")","(",".","|","[","]","{","}","'","-"]
rx = '[' + re.escape(''.join(chars_to_remove)) + ']' #remove punc, brackets etc...
string = re.sub(rx, '', string)
string = string.replace('&', 'and')
string = string.replace('limited', 'ltd')
string = string.replace('public limited company', 'plc')
string = string.replace('united states of america', 'usa')
string = string.replace('community interest company', 'cic')
string = string.title() # normalise case - capital at start of each word
string = re.sub(' +',' ',string).strip() # get rid of multiple spaces and replace with a single
string = ' '+ string +' ' # pad names for ngrams...
ngrams = zip(*[string[i:] for i in range(n)])
return [''.join(ngram) for ngram in ngrams]
Upvotes: 1
Views: 2168
Reputation: 32640
In general, try to avoid using python UDFs when you can actually do the same thing using only Spark builtin functions. Although pandas_udf brings better performances, using native spark functions should "always" perform faster.
Now for your question, the pandas_udf takes a pd.Series
so you need to adapt the code as your variable string
is no longer a single string but a Series.
from pyspark.sql import functions as F
@F.pandas_udf(ArrayType(StringType()))
def ngrams_udf(string: pd.Series, n: pd.Series) -> pd.Series:
"""Takes an input string, cleans it and converts to ngrams.
This script is focussed on cleaning UK company names but can be made generic by removing lines below"""
n = n.iloc[0]
string.str.lower() # lower case
string.str.encode("ascii", errors="ignore").str.decode("utf8") # remove non ascii chars
chars_to_remove = [")", "(", ".", "|", "[", "]", "{", "}", "'", "-"]
rx = '[' + re.escape(''.join(chars_to_remove)) + ']' # remove punc, brackets etc...
string = string.str.replace(rx, '', regex=True)
string = string.str.replace('&', 'and')
string = string.str.replace('limited', 'ltd')
string = string.str.replace('public limited company', 'plc')
string = string.str.replace('united states of america', 'usa')
string = string.str.replace('community interest company', 'cic')
string = string.str.title() # normalise case - capital at start of each word
# get rid of multiple spaces and replace with a single
string = string.str.replace(r"\s+", '', regex=True).str.strip()
string = string.str.pad(width=1, side='both') # pad names for ngrams...
string = string.apply(lambda x: zip(*[x[i:] for i in range(n)]))
string = string.apply(lambda x: [''.join(ngram) for ngram in list(x)])
return string
And using it with:
df.withColumn("ngrams", ngrams_udf(F.col("company"), F.lit(3)))
Upvotes: 2