Mohana B C
Mohana B C

Reputation: 5487

Adding a new column to a dataframe with a value which is based on the values from next rows

I have a dataframe as shown below,

+-----+----------+---------+-------+-------------------+
|jobid|fieldmname|new_value|coltype|           createat|
+-----+----------+---------+-------+-------------------+
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|
+-----+----------+---------+-------+-------------------+

  Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")

Need to add new column and add value only for rows where fieldmname is "jobstage". and the value should be latest status (check in next rows) for that corresponding jobstage. while selecting latest need to check for coltype value if it's "status".

Expected dataframe:

+-----+----------+---------+-------+-------------------+-------------+
|jobid|fieldmname|new_value|coltype|           createat|latest_status|
+-----+----------+---------+-------+-------------------+-------------+
|    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34|      sttaus2|
|    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|             |
|    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|     sttaus10|
|    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|             |
|    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|             |
|    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|             |
|    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34|      sttaus2|
|    2| jobstatus|  sttaus2| status|2022-11-11 12:11:34|             |
+-----+----------+---------+-------+-------------------+-------------+

I tried with lead, lag, row_number but not getting expected result.

Upvotes: 0

Views: 95

Answers (1)

samkart
samkart

Reputation: 6644

The question is tagged , so I'm writing a way to do the required in pyspark using the first() window function.

data_sdf. \
withColumn('latest',
           func.when(func.col('fieldmname') == 'jobstage', 
                     func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
                     over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
                     ).
           otherwise(func.lit(''))
           ). \
show()

# +-----+----------+---------+-------+-------------------+--------+
# |jobid|fieldmname|new_value|coltype|           createat|  latest|
# +-----+----------+---------+-------+-------------------+--------+
# |    1|  jobstage|  sttaus1|   null|2022-10-10 12:11:34| sttaus2|
# |    1| jobstatus|  sttaus2| status|2022-10-10 13:11:34|        |
# |    1|  jobstage|  sttaus3|   null|2022-10-10 14:11:34|sttaus10|
# |    1| jobstatus|  sttaus4|   null|2022-10-10 15:11:34|        |
# |    1| jobstatus| sttaus10| status|2022-10-10 16:11:34|        |
# |    1| jobstatus| sttaus11|   null|2022-10-10 17:11:34|        |
# |    2|  jobstage|  sttaus1|   null|2022-10-11 10:11:34| sttaus2|
# |    2| jobstatus|  sttaus2| status|2022-11-10 12:11:34|        |
# +-----+----------+---------+-------+-------------------+--------+

So, it will consider the first record from the corresponding records where fieldmname is "jobstatus" and coltype is "status".

Upvotes: 1

Related Questions