Reputation: 104
I am new to Pyspark, and I got below case where I need to solve it. Could some one please help and solve it. I checked and try to find similar issues in google and stack overflow as well. But Unfortunately I did not get it.
Problem:
I have dataframe which contains two columns one obsolote and another replace.
DataFrame:
In above data frame obsolute value get updated in replace column. ex: Here 10 becomes 12, in next row 12 becomes 14, 3rd row again 14th value becomes 16. If you see values getting updated in next row
so for first three values become one group because there is chain values getting updated. Highlighted in red color so for those obsolute value last value in replace which is 16 the latest value. For other two rows 19 is the connectivity value and highlighted in yellow colour so latest value for those two rows are 20 is latest value.
Expected Output
I tried with map and foreach in pyspark but did not get desire result. Some please help me how to solve this issue.
Upvotes: 1
Views: 185
Reputation: 4059
First you must discover all chains and create a group to them. After grouping, you can apply f.last()
function to return the desired value.
from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df = spark.createDataFrame([
(10, 12),
(12, 14),
(14, 16),
(18, 19),
(19, 20),
(22, 24),
(24, 25),
(25, 27),
(29, 30)
], ('obsolute', 'replace'))
w = Window.orderBy('obsolute')
df = (df
.withColumn('chain', f.coalesce(f.lag('replace').over(w) == f.col('obsolute'), f.lit(True)))
.withColumn('group', f.sum((f.col('chain') == f.lit(False)).cast('Int')).over(w)))
# +--------+-------+-----+-----+
# |obsolute|replace|chain|group|
# +--------+-------+-----+-----+
# |10 |12 |true |0 |
# |12 |14 |true |0 |
# |14 |16 |true |0 |
# |18 |19 |false|1 |
# |19 |20 |true |1 |
# |22 |24 |false|2 |
# |24 |25 |true |2 |
# |25 |27 |true |2 |
# |29 |30 |false|3 |
# +--------+-------+-----+-----+
w = Window.partitionBy('group')
df = df.select('obsolute', 'replace', f.last('replace').over(w).alias('latest'))
df.show(truncate=False)
Output
+--------+-------+------+
|obsolute|replace|latest|
+--------+-------+------+
|10 |12 |16 |
|12 |14 |16 |
|14 |16 |16 |
|18 |19 |20 |
|19 |20 |20 |
|22 |24 |27 |
|24 |25 |27 |
|25 |27 |27 |
|29 |30 |30 |
+--------+-------+------+
Upvotes: 2