Ananda-SteelTownBoy
Ananda-SteelTownBoy

Reputation: 611

Apache Spark, How to get time interval

I have sensor data , which gives signal frequently as shown, here I am trying find time interval of positive signal where signal =1.

Input:

 signal| timestamp
 ----------------------------
 0     | 2016-07-02 12:01:40
 1     | 2016-07-02 12:21:23
 1     | 2016-07-02 13:22:56
 1     | 2016-07-02 13:27:07
 0     | 2016-07-02 13:30:12
 0     | 2016-07-02 13:40:34
 1     | 2016-07-02 13:57:07
 1     | 2016-07-02 14:08:07

expected output:

start_time          | end_time            | positive_count
2016-07-02 12:21:23 | 2016-07-02 13:27:07 | 3
2016-07-02 13:57:07 | 2016-07-02 14:08:07 | 2

I have tried to use map() function to get time interval, but couldn't succeed as I can not get row values at a same time using map() function.

Upvotes: 1

Views: 1087

Answers (1)

BlueSheepToken
BlueSheepToken

Reputation: 6099

This is doable with Window functions.

The global idea is to define a column "signal_id", and groupBy on it, to take min timestamp, max timestamp and count.

In the idea, we would like a DataFrame like this :

+------+-------------------+---------+
|signal|          timestamp|id_signal|
+------+-------------------+---------+
|     1|2016-07-02 12:21:23|        1|
|     1|2016-07-02 13:22:56|        1|
|     1|2016-07-02 13:27:07|        1|
|     1|2016-07-02 13:57:07|        2|
|     1|2016-07-02 14:08:07|        2|
+------+-------------------+---------+

Okay, let us start.

// Some imports
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window

// The data
val df = Seq((0, "2016-07-02 12:01:40"),
 (1, "2016-07-02 12:21:23"),
 (1, "2016-07-02 13:22:56"),
 (1, "2016-07-02 13:27:07"),
 (0, "2016-07-02 13:30:12"),
 (0, "2016-07-02 13:40:34"),
 (1, "2016-07-02 13:57:07"),
 (1, "2016-07-02 14:08:07")).
toDF("signal", "timestamp").
withColumn("timestamp", F.to_timestamp($"timestamp", "yyyy-MM-dd HH:mm:ss"))

Generally, the idea of creating id for each signal, is to have a column with a 1 if it is a new signal, 0 otherwise.

For this we need to check if the row before has 0 in the column signal and 1 in the current row.

Let's do a window function with a lag for this.

val newSignalWindow = Window.orderBy("timestamp")

val dfWithNewSignal = df.withColumn("new_signal", (F.lag($"signal", 1, 0).over(newSignalWindow) === 0 && $"signal" === 1).cast("bigint"))
dfWithNewSignal.show
/*
+------+-------------------+----------+
|signal|          timestamp|new_signal|
+------+-------------------+----------+
|     0|2016-07-02 12:01:40|         0|
|     1|2016-07-02 12:21:23|         1|
|     1|2016-07-02 13:22:56|         0|
|     1|2016-07-02 13:27:07|         0|
|     0|2016-07-02 13:30:12|         0|
|     0|2016-07-02 13:40:34|         0|
|     1|2016-07-02 13:57:07|         1|
|     1|2016-07-02 14:08:07|         0|
+------+-------------------+----------+
*/

Okay, now we can simply do a filter and a cumulative sum to give a unique id to our signals :

val dfWithIdSignal = dfWithNewSignal.
filter($"signal" === 1).
withColumn("id_signal", F.sum("new_signal").over(newSignalWindow)).
drop("new_signal")
dfWithIdSignal.show
/*
+------+-------------------+---------+
|signal|          timestamp|id_signal|
+------+-------------------+---------+
|     1|2016-07-02 12:21:23|        1|
|     1|2016-07-02 13:22:56|        1|
|     1|2016-07-02 13:27:07|        1|
|     1|2016-07-02 13:57:07|        2|
|     1|2016-07-02 14:08:07|        2|
+------+-------------------+---------+
*/

And then, a simple groupBy and we are done

val resultDf = dfWithIdSignal.groupBy("id_signal").agg(F.min("timestamp").as("start_date"), F.max("timestamp").as("end_date"), F.count("*").as("positive_count")).drop("id_signal")

resultDf.show

Here is our result :

+-------------------+-------------------+--------------+
|         start_date|           end_date|positive_count|
+-------------------+-------------------+--------------+
|2016-07-02 12:21:23|2016-07-02 13:27:07|             3|
|2016-07-02 13:57:07|2016-07-02 14:08:07|             2|
+-------------------+-------------------+--------------+

Upvotes: 4

Related Questions