Reputation: 1029
i am creating a spark UDF inside a class. when i execute the below code i am getting exception.
class A(B):
def __init__(self,spark):
B.__init__(self)
self.spark = spark
def process(self, df):
df = df.withColumn("col_sub_entry", self.conditions_title("entry_title"))
def conditions_entry_title(self,x:StringType()):
if len(x.split(" ") < 3):
return 0
else :
return x
conditions_title = udf(conditions_entry_title, IntegerType())
Upvotes: 2
Views: 7672
Reputation: 43544
You should always avoid using udfs when the same operation can be done using the API functions.
This is how I would do it:
from pyspark.sql.functions import when, col, size, split
class A:
def __init__(self, spark):
# B.__init__(self)
self.spark = spark
def process(self, df):
df = df.withColumn("col_sub_entry", A.conditions_title("entry_title"))
return df
@staticmethod
def conditions_title(someColumn):
return when(size(split(col(someColumn), "\s")) < 3, 0).otherwise(1)
Or even:
@staticmethod
def conditions_title(someColumn):
return (size(split(col(someColumn), "\s")) >= 3).cast("int")
Upvotes: 2
Reputation: 215137
The conditions_title
you defined in your udf is not consistent; It seems you are trying to define it as a static member of the class but refer to it as an instance method via self
and also since the self
in the udf is not used here, you can define it as a static method:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
class A:
def __init__(self, spark):
# B.__init__(self)
self.spark = spark
def process(self, df):
df = df.withColumn("col_sub_entry", A.conditions_title("entry_title"))
return df
@staticmethod
@udf(IntegerType())
def conditions_title(x: StringType):
if len(x.split(" ")) < 3:
return 0
else :
return 1
Upvotes: 1