K. K.
K. K.

Reputation: 572

PySpark equivalent for lambda function in Pandas UDF

I have written a data preprocessing codes in Pandas UDF in PySpark. I'm using lambda function to extract a part of the text from all the records of a column.

Here is how my code looks like:

@pandas_udf("string", PandasUDFType.SCALAR)
def get_X(col):
      return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x)

df = df.withColumn('X', get_first_name(df.Y))

This is working fine and giving the desired results. But I need to write the same piece of logic in Spark equivalent code. Is there a way to do it? Thanks.

Upvotes: 1

Views: 9937

Answers (3)

Hristo Iliev
Hristo Iliev

Reputation: 74355

Given the following DataFrame df:

df.show()
# +-------------+
# |     BENF_NME|
# +-------------+
# |    Doe, John|
# |          Foo|
# |Baz, Quux,Bar|
# +-------------+

You can simply use regexp_extract() to select the first name:

from pyspark.sql.functions import regexp_extract
df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

If you don't care about possible leading spaces, substring_index() provides a simple alternative to your original logic:

from pyspark.sql.functions import substring_index
df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

In this case the first row's First_Name has a leading space:

df.withColumn(...).collect()[0]
# Row(BENF_NME=u'Doe, John', First_Name=u' John'

If you still want to use a custom function, you need to create a user-defined function (UDF) using udf():

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_first_name = udf(lambda s: s.split(',')[-1], StringType())
df.withColumn('First_Name', get_first_name(df.BENF_NME)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Note that UDFs are slower than the built-in Spark functions, especially Python UDFs.

Upvotes: 1

jxc
jxc

Reputation: 13998

I think one function substring_index is enough for this particular task:

from pyspark.sql.functions import substring_index

df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1'])

df2.withColumn('c2', substring_index('c1', ',', -1)).show()                                                                 
+------+---+
|    c1| c2|
+------+---+
|   f,l|  l|
|     g|  g|
|a,b,cd| cd|
+------+---+

Upvotes: 2

pault
pault

Reputation: 43494

You can do the same using when to implement if-then-else logic:

First split the column, then compute its size. If the size is greater than 0, take the last element from the split array. Otherwise, return the original column.

from pyspark.sql.functions import split, size, when

def get_first_name(col):
    col_split = split(col, ',')
    split_size = size(col_split)
    return when(split_size > 0, col_split[split_size-1]).otherwise(col)

As an example, suppose you had the following DataFrame:

df.show()
#+---------+
#| BENF_NME|
#+---------+
#|Doe, John|
#|  Madonna|
#+---------+

You can call the new function just as before:

df = df.withColumn('First_Name', get_first_name(df.BENF_NME))
df.show()
#+---------+----------+
#| BENF_NME|First_Name|
#+---------+----------+
#|Doe, John|      John|
#|  Madonna|   Madonna|
#+---------+----------+

Upvotes: 0

Related Questions