Himanshu Yadav
Himanshu Yadav

Reputation: 13585

Spark: Going reverse in dataframe until a condition met

I have a dataframe in the following format:

+----------+-------+----------+---------+-----------------------+---------+---------+
|rownum    |viewid |skillid   |parentId |post_timestamp         |is_skill |column A |
+----------+-------+----------+---------+-----------------------+---------+---------+
|1         |251    |b         |xyz12    |2019-01-31 09:24:02.868|true     |abcde    |
|2         |251    |b         |abc34    |2019-01-31 10:24:02.868|false    |453aw    |
|3         |251    |b         |abc34    |2019-01-31 11:24:02.868|false    |abcde    |
|4         |94     |a         |ghi23    |2019-01-31 02:28:05.107|false    |bbbbb    |
|5         |94     |a         |yui67    |2019-01-31 09:06:57.976|true     |nnnn     |
|6         |94     |a         |qwe12    |2019-01-31 09:24:02.868|false    |2n21q    |
|7         |94     |a         |qwe12    |2019-01-31 10:06:57.976|false    |nnnnq    |
|8         |94     |a         |rty87    |2019-01-31 15:07:57.976|true     |1234     |
|9         |94     |a         |bnm22    |2019-01-31 16:28:05.107|true     |1234     |
|10        |94     |a         |bnm22    |2019-01-31 17:28:05.107|true     |6789     |
|11        |94     |b         |tyu12    |2019-01-31 09:24:02.868|true     |6789     |
+----------+-------+----------+---------+-----------------------+---------+---------+

for a group of viewid and skillid, if current row's parentId is not equal to the previous row's parentId then find the latest row with skillId's value true in that group and check current row's columnA value is not equal to the that row's columnA value.

Column matchedParentId = df.col("parentId").$eq$eq$eq(functions.lag("parentId",1);```

Now how can I go back to the dataframe until skillId is true? I guess going back would be doable as the dataframe is ordered by timestamp.

Upvotes: 0

Views: 376

Answers (2)

Gofrette
Gofrette

Reputation: 478

I use Scala, but the solution I came up with is

-Use window functions to find rownumber of last row with is_skill = true before a row where parent_Id is not equal to previous parent_Id -Self join dataframe to match rows

Is the desired output as follows?

+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
|rownum|viewid|skillid|parentId|      post_timestamp|is_skill|column A|matchedParentId|isAEqual|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+
|     1|   251|      b|   xyz12|20190131 09:24:02...|    true|   abcde|           null|    true|
|     2|   251|      b|   abc34|20190131 10:24:02...|   false|   453aw|          false|   false|
|     3|   251|      b|   abc34|20190131 11:24:02...|   false|   abcde|           true|    true|
|     5|    94|      a|   yui67|20190131 09:06:57...|    true|    nnnn|          false|    true|
|     6|    94|      a|   qwe12|20190131 09:24:02...|   false|   2n21q|          false|   false|
|     7|    94|      a|   qwe12|20190131 10:06:57...|   false|   nnnnq|           true|   false|
|     8|    94|      a|   rty87|20190131 15:07:57...|    true|    1234|          false|    true|
|     9|    94|      a|   bnm22|20190131 16:28:05...|    true|    1234|          false|    true|
|    10|    94|      a|   bnm22|20190131 17:28:05...|    true|    6789|           true|    true|
|    11|    94|      b|   tyu12|20190131 09:24:02...|    true|    6789|           null|    true|
+------+------+-------+--------+--------------------+--------+--------+---------------+--------+

Here is the code:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
import spark.implicits._

val df = Seq((1,251 ,"b" ,"xyz12" ,"20190131 09:24:02.868",true  ,"abcde"),
             (2 ,251 ,"b" ,"abc34" ,"20190131 10:24:02.868", false ,"453aw"),
             (3 ,251 ,"b" ,"abc34" ,"20190131 11:24:02.868", false ,"abcde"),
             (4 ,94  ,"a" ,"ghi23" ,"20190131 02:28:05.107", false ,"bbbbb"),
             (5 ,94  ,"a" ,"yui67" ,"20190131 09:06:57.976", true  ,"nnnn"),
             (6 ,94  ,"a" ,"qwe12" ,"20190131 09:24:02.868", false ,"2n21q"),
             (7 ,94  ,"a" ,"qwe12" ,"20190131 10:06:57.976", false ,"nnnnq"),
             (8 ,94  ,"a" ,"rty87" ,"20190131 15:07:57.976", true  ,"1234"),
             (9 ,94  ,"a" ,"bnm22" ,"20190131 16:28:05.107", true  ,"1234"),
             (10  ,94  ,"a" ,"bnm22" ,"20190131 17:28:05.107",true  ,"6789"),
             (11  ,94  ,"b" ,"tyu12" ,"20190131 09:24:02.868",true  ,"6789")).
             toDF("rownum", "viewid", "skillid", "parentId" , "post_timestamp", "is_skill", "column A")

val w = Window.partitionBy("viewid", "skillid").orderBy("post_timestamp")

val df2 = df.withColumn("matchedParentId", lag($"parentId", 1).over(w).equalTo($"parentId")).
             withColumn("rank", rank.over(w)).withColumn("is_skill_int", when($"is_skill", 1).otherwise(0)).
             withColumn("test", max($"is_skill_int" * $"rank").over(w))

val df3 = df2.as("df_left").
             join(df2.as("df_right"), $"df_left.viewid".equalTo($"df_right.viewid").
                                  and($"df_left.skillid".equalTo($"df_right.skillid")).
                                  and($"df_left.rank".equalTo($"df_right.test"))).
             withColumn("isAEqual", $"df_left.column A".equalTo($"df_right.column A")).
             select("df_right.rownum", "df_right.viewid", "df_right.skillid", "df_right.parentId", "df_right.post_timestamp", "df_right.is_skill", "df_right.column A", "df_right.matchedParentId", "isAEqual").
             orderBy("rownum")

df3.show

Upvotes: 1

Ranga Vure
Ranga Vure

Reputation: 1932

Here is the approach, i would recommend

  1. groupby (viewid,skillid), and and collect the grouped records as a list
  2. Implement udf, which takes the list, and can traverse and impl logic
  3. select the records based on udf return value (likely timestamp)

Upvotes: 0

Related Questions