Himanshu Yadav
Himanshu Yadav

Reputation: 13585

Spark: Complex operation with dataframes

I have the input dataset in the following format:

+---+--------+----------+
| id|   refId| timestamp|
+---+--------+----------+
|  1|    null|1548944642|
|  1|29950529|1548937685|
|  2|27510720|1548944885|
|  2|27510720|1548943617|
+---+--------+----------+

A new column session needs to be added with the following transformation logic:

  1. If refId is null, the session value is true.
  2. If id and refId are unique, the session value is true.
  3. If id and refId are not unique and `timestamp is greater than the previous row, the session value is true. Also the difference between the timestamps should be >60.
+---+--------+-------+----------+
| id|   refId|session| timestamp|
+---+--------+-------+----------+
|  1|    null|   true|1548944642|
|  1|29950529|   true|1548937685|
|  2|27510720|  false|1548943617|
|  2|27510720|   true|1548944885|
+---+--------+-------+----------+

I am able to do 1 & 3 conditions separately but not the 2nd one.

  1. `data.withColumn("session", functions.when(data.col("refId").isNull(), true));
  2. 3.
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));

My question is how to fulfill 2nd condition and implement all 3 transformations together.

Upvotes: 0

Views: 347

Answers (2)

firsni
firsni

Reputation: 906

you can use window function to groupBy id and rfId and order by timestamp then add a rank column. Finally you add the session column with the when, otherwise sql function.

import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
      .withColumn("previous", lag("timestamp", 1) over win)
      .withColumn("rank", rank() over win)
      .withColumn("session",
        when(col("refId").isNull || col("rank") === lit(1), true)
          .otherwise(false)
      )
      .withColumn("diff", col("timestamp") - col("previous"))

Upvotes: 1

Subhasish Guha
Subhasish Guha

Reputation: 232

I would say use Spark SQL for less complexity and achieve your result easily

df.createOrReplaceTempView("test")

spark.sql("select id,refId,timestamp,case when refId is null and id is not null then 'true' when id is not null and refId is not null and rank=1 then 'true' else 'false' end as session from  (select id,refId,timestamp, rank() OVER (PARTITION BY id,refId ORDER BY timestamp DESC) as rank from test) c").show()

The output looks like this :

+---+--------+----------+-------+
| id|   refId| timestamp|session|
+---+--------+----------+-------+
|  1|    null|1548944642|   true|
|  1|29950529|1548937685|   true|
|  2|27510720|1548944885|   true|
|  2|27510720|1548943617|  false|
+---+--------+----------+-------+ 

Upvotes: 1

Related Questions