user1389739
user1389739

Reputation: 111

Conditionally get previous row value

I have the following dataset

columns = ['id','trandatetime','code','zip']
data = [('1','2020-02-06T17:33:21.000+0000', '0','35763'),('1','2020-02-06T17:39:55.000+0000', '0','35763'), ('1','2020-02-07T06:06:42.000+0000', '0','35741'), ('1','2020-02-07T06:28:17.000+0000', '4','94043'),('1','2020-02-07T07:12:13.000+0000','0','35802'), ('1','2020-02-07T08:23:29.000+0000', '0','30738')]
df = spark.createDataFrame(data).toDF(*columns)
df= df.withColumn("trandatetime",to_timestamp("trandatetime"))

+---+--------------------+----+-----+
| id|        trandatetime|code|  zip|
+---+--------------------+----+-----+
|  1|2020-02-06T17:33:...|   0|35763|
|  1|2020-02-06T17:39:...|   0|35763|
|  1|2020-02-07T06:06:...|   0|35741|
|  1|2020-02-07T06:28:...|   4|94043|
|  1|2020-02-07T07:12:...|   0|35802|
|  1|2020-02-07T08:23:...|   0|30738|
+---+--------------------+----+-----+

I am trying to get the previous row zip when code = 0 within a time period. This is my attempt, but you can see that the row where code is 4 is getting a value, that should be null. The row after the 4 is null, but that one should have a value in it.

from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('id').orderBy('timestamp').rangeBetween(-60*60*24,-1)
df = df.withColumn("Card_Present_Last_Zip",F.last(F.when(col("code") == '0', col("zip"))).over(w))

+---+--------------------+----+-----+----------+---------------------+
| id|        trandatetime|code|  zip| timestamp|Card_Present_Last_Zip|
+---+--------------------+----+-----+----------+---------------------+
|  1|2020-02-06T17:33:...|   0|35763|1581010401|                 null|
|  1|2020-02-06T17:39:...|   0|35763|1581010795|                35763|
|  1|2020-02-07T06:06:...|   0|35741|1581055602|                35763|
|  1|2020-02-07T06:28:...|   4|94043|1581056897|                35741|
|  1|2020-02-07T07:12:...|   0|35802|1581059533|                 null|
|  1|2020-02-07T08:23:...|   0|30738|1581063809|                35802|
+---+--------------------+----+-----+----------+---------------------+

Upvotes: 2

Views: 2455

Answers (2)

AdibP
AdibP

Reputation: 2939

Put the last function (with ignorenulls set to True) expression into another when clause to only apply window operation on rows with code = '0'

w = Window.partitionBy('id').orderBy('timestamp').rangeBetween(-60*60*24,-1)
df = (df
       .withColumn("timestamp", F.unix_timestamp("trandatetime"))
       .withColumn("Card_Present_Last_Zip", F.when(F.col("code") == '0', F.last(F.when(F.col("code") == '0', F.col("zip")), ignorenulls=True).over(w)))
      )
df.show()

# +---+-------------------+----+-----+----------+---------------------+
# | id|       trandatetime|code|  zip| timestamp|Card_Present_Last_Zip|
# +---+-------------------+----+-----+----------+---------------------+
# |  1|2020-02-06 17:33:21|   0|35763|1581010401|                 null|
# |  1|2020-02-06 17:39:55|   0|35763|1581010795|                35763|
# |  1|2020-02-07 06:06:42|   0|35741|1581055602|                35763|
# |  1|2020-02-07 06:28:17|   4|94043|1581056897|                 null|
# |  1|2020-02-07 07:12:13|   0|35802|1581059533|                35741|
# |  1|2020-02-07 08:23:29|   0|30738|1581063809|                35802|
# +---+-------------------+----+-----+----------+---------------------+

Upvotes: 1

Mohana B C
Mohana B C

Reputation: 5487

You can use window function lag() .

window_spec = Window.partitionBy('id').orderBy('timestamp')

df.withColumn('prev_zip', lag('zip').over(window_spec)).\
withColumn('Card_Present_Last_Zip', when(col('code') == 0, col('prev_zip')).otherwise(None)).show()

Upvotes: 2

Related Questions