Reputation: 2411
I'm trying to apply an UDF function just after a column has been created.
But I got the issue :
Cannot resolve column name "previous_status" among
Which means that the column doesn't exist.
I could probably modify the UDF function in order to not be an UDF anymore and just a normal function with F.when
& otherwise
. The thing is I need a global dict as you can see to determine whether I've already seen that id or not.
alreadyAuthorized = {}
def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
if id in alreadyAuthorized:
previously_authorized = 1
else:
previously_authorized = 0
if not failed:
alreadyAuthorized[id] = True
return previously_authorized
previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())
def get_previous_status(data):
partition = Window.partitionBy("id").orderBy("date")
data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
.withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))
data = get_previous_status(data)
Upvotes: 0
Views: 64
Reputation: 1570
Try using col
function to get column, because as @LaSul pointed out, you use data
before it was assigned:
from pyspark.sql.function import col
...
data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
.withColumn("previously_authorized", previously_authorized_udf(col("id"), col("previous_status")))
...
Upvotes: 1