Reputation: 5487
I have a dataframe as shown below,
+-----+----------+---------+-------+-------------------+
|jobid|fieldmname|new_value|coltype| createat|
+-----+----------+---------+-------+-------------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34|
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34|
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34|
+-----+----------+---------+-------+-------------------+
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")
Need to add new column and add value only for rows where fieldmname is "jobstage". and the value should be latest status (check in next rows) for that corresponding jobstage. while selecting latest need to check for coltype value if it's "status".
Expected dataframe:
+-----+----------+---------+-------+-------------------+-------------+
|jobid|fieldmname|new_value|coltype| createat|latest_status|
+-----+----------+---------+-------+-------------------+-------------+
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34| sttaus10|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34| |
+-----+----------+---------+-------+-------------------+-------------+
I tried with lead, lag, row_number but not getting expected result.
Upvotes: 0
Views: 95
Reputation: 6644
The question is tagged pyspark, so I'm writing a way to do the required in pyspark using the first()
window function.
data_sdf. \
withColumn('latest',
func.when(func.col('fieldmname') == 'jobstage',
func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
).
otherwise(func.lit(''))
). \
show()
# +-----+----------+---------+-------+-------------------+--------+
# |jobid|fieldmname|new_value|coltype| createat| latest|
# +-----+----------+---------+-------+-------------------+--------+
# | 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
# | 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
# | 1| jobstage| sttaus3| null|2022-10-10 14:11:34|sttaus10|
# | 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
# | 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
# | 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
# | 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
# | 2| jobstatus| sttaus2| status|2022-11-10 12:11:34| |
# +-----+----------+---------+-------+-------------------+--------+
So, it will consider the first record from the corresponding records where fieldmname
is "jobstatus" and coltype
is "status".
Upvotes: 1