LaSul
LaSul

Reputation: 2411

Pyspark - UDF function right after column creation

I'm trying to apply an UDF function just after a column has been created.

But I got the issue :

Cannot resolve column name "previous_status" among

Which means that the column doesn't exist.

I could probably modify the UDF function in order to not be an UDF anymore and just a normal function with F.when & otherwise. The thing is I need a global dict as you can see to determine whether I've already seen that id or not.

alreadyAuthorized = {}

def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
    if id in alreadyAuthorized:
        previously_authorized = 1
    else:
        previously_authorized = 0

    if not failed:
        alreadyAuthorized[id] = True

    return previously_authorized

previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())

def get_previous_status(data):
    partition = Window.partitionBy("id").orderBy("date")

    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))

data = get_previous_status(data)

Upvotes: 0

Views: 64

Answers (1)

Artem Vovsia
Artem Vovsia

Reputation: 1570

Try using col function to get column, because as @LaSul pointed out, you use data before it was assigned:

from pyspark.sql.function import col

...
    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(col("id"), col("previous_status")))

...

Upvotes: 1

Related Questions