Reputation: 13585
I have a dataframe in the following format:
+----------+-------+----------+---------+-----------------------+---------+---------+
|rownum |viewid |skillid |parentId |post_timestamp |is_skill |column A |
+----------+-------+----------+---------+-----------------------+---------+---------+
|1 |251 |b |xyz12 |2019-01-31 09:24:02.868|true |abcde |
|2 |251 |b |abc34 |2019-01-31 10:24:02.868|false |453aw |
|3 |251 |b |abc34 |2019-01-31 11:24:02.868|false |abcde |
|4 |94 |a |ghi23 |2019-01-31 02:28:05.107|false |bbbbb |
|5 |94 |a |yui67 |2019-01-31 09:06:57.976|true |nnnn |
|6 |94 |a |qwe12 |2019-01-31 09:24:02.868|false |2n21q |
|7 |94 |a |qwe12 |2019-01-31 10:06:57.976|false |nnnnq |
|8 |94 |a |rty87 |2019-01-31 15:07:57.976|true |1234 |
|9 |94 |a |bnm22 |2019-01-31 16:28:05.107|true |1234 |
|10 |94 |a |bnm22 |2019-01-31 17:28:05.107|true |6789 |
|11 |94 |b |tyu12 |2019-01-31 09:24:02.868|true |6789 |
+----------+-------+----------+---------+-----------------------+---------+---------+
for a group of viewid
and skillid
,
if current row's parentId
is not equal to the previous row's parentId
then find the latest row with skillId's value true in that group and
check current row's columnA value is not equal to the that row's columnA value.
Column matchedParentId = df.col("parentId").$eq$eq$eq(functions.lag("parentId",1);```
Now how can I go back to the dataframe until skillId is true? I guess going back would be doable as the dataframe is ordered by timestamp.
Upvotes: 0
Views: 376
Reputation: 478
I use Scala, but the solution I came up with is
-Use window functions to find rownumber of last row with is_skill = true before a row where parent_Id is not equal to previous parent_Id -Self join dataframe to match rows
Is the desired output as follows?
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
|rownum|viewid|skillid|parentId| post_timestamp|is_skill|column A|matchedParentId|isAEqual|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
| 1| 251| b| xyz12|20190131 09:24:02...| true| abcde| null| true|
| 2| 251| b| abc34|20190131 10:24:02...| false| 453aw| false| false|
| 3| 251| b| abc34|20190131 11:24:02...| false| abcde| true| true|
| 5| 94| a| yui67|20190131 09:06:57...| true| nnnn| false| true|
| 6| 94| a| qwe12|20190131 09:24:02...| false| 2n21q| false| false|
| 7| 94| a| qwe12|20190131 10:06:57...| false| nnnnq| true| false|
| 8| 94| a| rty87|20190131 15:07:57...| true| 1234| false| true|
| 9| 94| a| bnm22|20190131 16:28:05...| true| 1234| false| true|
| 10| 94| a| bnm22|20190131 17:28:05...| true| 6789| true| true|
| 11| 94| b| tyu12|20190131 09:24:02...| true| 6789| null| true|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
Here is the code:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
import spark.implicits._
val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true ,"abcde"),
(2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"),
(3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"),
(4 ,94 ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"),
(5 ,94 ,"a" ,"yui67" ,"20190131 09:06:57.976", true ,"nnnn"),
(6 ,94 ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"),
(7 ,94 ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"),
(8 ,94 ,"a" ,"rty87" ,"20190131 15:07:57.976", true ,"1234"),
(9 ,94 ,"a" ,"bnm22" ,"20190131 16:28:05.107", true ,"1234"),
(10 ,94 ,"a" ,"bnm22" ,"20190131 17:28:05.107",true ,"6789"),
(11 ,94 ,"b" ,"tyu12" ,"20190131 09:24:02.868",true ,"6789")).
toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")
val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")
val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")).
withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)).
withColumn("test", max($"is_skill_int" * $"rank").over(w))
val df3 = df2.as("df_left").
join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid").
and($"df_left.skillid".equalTo($"df_right.skillid")).
and($"df_left.rank".equalTo($"df_right.test"))).
withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")).
select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual").
orderBy("rownum")
df3.show
Upvotes: 1
Reputation: 1932
Here is the approach, i would recommend
Upvotes: 0