Anneso
Anneso

Reputation: 613

Use Window to count lines with if condition in scala 2

I already post a question similar but someone gave me a trick to avoid using the "if condition".

Here I am in a similar position and I do not find any trick to avoid it....

I have a dataframe.

var df = sc.parallelize(Array(
(1,  "2017-06-29 10:53:53.0","2017-06-25 14:60:53.0","boulanger.fr"), 
(2,  "2017-07-05 10:48:57.0","2017-09-05 08:60:53.0","patissier.fr"), 
(3,  "2017-06-28 10:31:42.0","2017-02-28 20:31:42.0","boulanger.fr"), 
(4,  "2017-08-21 17:31:12.0","2017-10-21 10:29:12.0","patissier.fr"), 
(5,  "2017-07-28 11:22:42.0","2017-05-28 11:22:42.0","boulanger.fr"), 
(6,  "2017-08-23 17:03:43.0","2017-07-23 09:03:43.0","patissier.fr"), 
(7,  "2017-08-24 16:08:07.0","2017-08-22 16:08:07.0","boulanger.fr"), 
(8,  "2017-08-31 17:20:43.0","2017-05-22 17:05:43.0","patissier.fr"), 
(9,  "2017-09-04 14:35:38.0","2017-07-04 07:30:25.0","boulanger.fr"), 
(10, "2017-09-07 15:10:34.0","2017-07-29 12:10:34.0","patissier.fr"))).toDF("id", "date1","date2", "mail")

df = df.withColumn("date1", (unix_timestamp($"date1", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))
df = df.withColumn("date2", (unix_timestamp($"date2", "yyyy-MM-dd HH:mm:ss").cast("timestamp")))

df = df.orderBy("date1", "date2")

It looks like:

+---+---------------------+---------------------+------------+
|id |date1                |date2                |mail        |
+---+---------------------+---------------------+------------+
|3  |2017-06-28 10:31:42.0|2017-02-28 20:31:42.0|boulanger.fr|
|1  |2017-06-29 10:53:53.0|2017-06-25 15:00:53.0|boulanger.fr|
|2  |2017-07-05 10:48:57.0|2017-09-05 09:00:53.0|patissier.fr|
|5  |2017-07-28 11:22:42.0|2017-05-28 11:22:42.0|boulanger.fr|
|4  |2017-08-21 17:31:12.0|2017-10-21 10:29:12.0|patissier.fr|
|6  |2017-08-23 17:03:43.0|2017-07-23 09:03:43.0|patissier.fr|
|7  |2017-08-24 16:08:07.0|2017-08-22 16:08:07.0|boulanger.fr|
|8  |2017-08-31 17:20:43.0|2017-05-22 17:05:43.0|patissier.fr|
|9  |2017-09-04 14:35:38.0|2017-07-04 07:30:25.0|boulanger.fr|
|10 |2017-09-07 15:10:34.0|2017-07-29 12:10:34.0|patissier.fr|
+---+---------------------+---------------------+------------+

For each id I want to count among all other line the number of lines with:

  1. a date1 in [my_current_date1-60 day, my_current_date1-1 day]
  2. a date2 < my_current_date1
  3. the same mail than my current_mail

If I look at the line 5 I want to return the number of line with:

  1. date1 in [2017-05-29 11:22:42.0, 2017-07-27 11:22:42.0]
  2. date2 < 2017-07-28 11:22:42.0
  3. mail = boulanger.fr

--> The result would be 2 (corresponding to id 1 and id 3)

So I would like to do something like:

val w = Window.partitionBy("mail").orderBy(col("date1").cast("long")).rangeBetween(-60*24*60*60,-1*24*60*60)
var df= df.withColumn("all_previous", count("mail") over w)

But this will respond to condition 1 and condition 3 but not the second one... i have to add something to includ this second condition comparing date2 to my_date1...

Upvotes: 0

Views: 214

Answers (1)

Leo C
Leo C

Reputation: 22449

Using a generalized Window spec with last(date1) being the current date1 per Window partition and a sum over 0's and 1's as conditional count, here's how I would incorporate your condition #2 into the counting criteria:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def days(n: Long): Long = n * 24 * 60 * 60

val w = Window.partitionBy("mail").orderBy($"date1".cast("long"))
val w1 = w.rangeBetween(days(-60), days(0))
val w2 = w.rangeBetween(days(-60), days(-1))

df.withColumn("all_previous", sum(
      when($"date2".cast("long") < last($"date1").over(w1).cast("long"), 1).
        otherwise(0)
    ).over(w2)
  ).na.fill(0).
  show
// +---+-------------------+-------------------+------------+------------+
// | id|              date1|              date2|        mail|all_previous|
// +---+-------------------+-------------------+------------+------------+
// |  3|2017-06-28 10:31:42|2017-02-28 20:31:42|boulanger.fr|           0|
// |  1|2017-06-29 10:53:53|2017-06-25 15:00:53|boulanger.fr|           1|
// |  5|2017-07-28 11:22:42|2017-05-28 11:22:42|boulanger.fr|           2|
// |  7|2017-08-24 16:08:07|2017-08-22 16:08:07|boulanger.fr|           3|
// |  9|2017-09-04 14:35:38|2017-07-04 07:30:25|boulanger.fr|           2|
// |  2|2017-07-05 10:48:57|2017-09-05 09:00:53|patissier.fr|           0|
// |  4|2017-08-21 17:31:12|2017-10-21 10:29:12|patissier.fr|           0|
// |  6|2017-08-23 17:03:43|2017-07-23 09:03:43|patissier.fr|           0|
// |  8|2017-08-31 17:20:43|2017-05-22 17:05:43|patissier.fr|           1|
// | 10|2017-09-07 15:10:34|2017-07-29 12:10:34|patissier.fr|           2|
// +---+-------------------+-------------------+------------+------------+

[UPDATE]

This solution is incorrect, even though the result appears to be correct with the sample dataset. In particular, last($"date1").over(w1) did not work the way intended. The answer is being kept to hopefully serve as a lead for a working solution.

Upvotes: 1

Related Questions