Reputation: 572
I have written a data preprocessing codes in Pandas UDF in PySpark. I'm using lambda function to extract a part of the text from all the records of a column.
Here is how my code looks like:
@pandas_udf("string", PandasUDFType.SCALAR)
def get_X(col):
return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x)
df = df.withColumn('X', get_first_name(df.Y))
This is working fine and giving the desired results. But I need to write the same piece of logic in Spark equivalent code. Is there a way to do it? Thanks.
Upvotes: 1
Views: 9937
Reputation: 74355
Given the following DataFrame df
:
df.show()
# +-------------+
# | BENF_NME|
# +-------------+
# | Doe, John|
# | Foo|
# |Baz, Quux,Bar|
# +-------------+
You can simply use regexp_extract()
to select the first name:
from pyspark.sql.functions import regexp_extract
df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
If you don't care about possible leading spaces, substring_index()
provides a simple alternative to your original logic:
from pyspark.sql.functions import substring_index
df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
In this case the first row's First_Name
has a leading space:
df.withColumn(...).collect()[0]
# Row(BENF_NME=u'Doe, John', First_Name=u' John'
If you still want to use a custom function, you need to create a user-defined function (UDF) using udf()
:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_first_name = udf(lambda s: s.split(',')[-1], StringType())
df.withColumn('First_Name', get_first_name(df.BENF_NME)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
Note that UDFs are slower than the built-in Spark functions, especially Python UDFs.
Upvotes: 1
Reputation: 13998
I think one function substring_index is enough for this particular task:
from pyspark.sql.functions import substring_index
df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1'])
df2.withColumn('c2', substring_index('c1', ',', -1)).show()
+------+---+
| c1| c2|
+------+---+
| f,l| l|
| g| g|
|a,b,cd| cd|
+------+---+
Upvotes: 2
Reputation: 43494
You can do the same using when
to implement if-then-else logic:
First split
the column, then compute its size
. If the size is greater than 0, take the last element from the split array. Otherwise, return the original column.
from pyspark.sql.functions import split, size, when
def get_first_name(col):
col_split = split(col, ',')
split_size = size(col_split)
return when(split_size > 0, col_split[split_size-1]).otherwise(col)
As an example, suppose you had the following DataFrame:
df.show()
#+---------+
#| BENF_NME|
#+---------+
#|Doe, John|
#| Madonna|
#+---------+
You can call the new function just as before:
df = df.withColumn('First_Name', get_first_name(df.BENF_NME))
df.show()
#+---------+----------+
#| BENF_NME|First_Name|
#+---------+----------+
#|Doe, John| John|
#| Madonna| Madonna|
#+---------+----------+
Upvotes: 0