Reputation: 611
I have sensor data , which gives signal frequently as shown, here I am trying find time interval of positive signal where signal =1.
Input:
signal| timestamp
----------------------------
0 | 2016-07-02 12:01:40
1 | 2016-07-02 12:21:23
1 | 2016-07-02 13:22:56
1 | 2016-07-02 13:27:07
0 | 2016-07-02 13:30:12
0 | 2016-07-02 13:40:34
1 | 2016-07-02 13:57:07
1 | 2016-07-02 14:08:07
expected output:
start_time | end_time | positive_count
2016-07-02 12:21:23 | 2016-07-02 13:27:07 | 3
2016-07-02 13:57:07 | 2016-07-02 14:08:07 | 2
I have tried to use map() function to get time interval, but couldn't succeed as I can not get row values at a same time using map() function.
Upvotes: 1
Views: 1087
Reputation: 6099
This is doable with Window
functions.
The global idea is to define a column "signal_id", and groupBy
on it, to take min timestamp, max timestamp and count.
In the idea, we would like a DataFrame
like this :
+------+-------------------+---------+
|signal| timestamp|id_signal|
+------+-------------------+---------+
| 1|2016-07-02 12:21:23| 1|
| 1|2016-07-02 13:22:56| 1|
| 1|2016-07-02 13:27:07| 1|
| 1|2016-07-02 13:57:07| 2|
| 1|2016-07-02 14:08:07| 2|
+------+-------------------+---------+
Okay, let us start.
// Some imports
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.expressions.Window
// The data
val df = Seq((0, "2016-07-02 12:01:40"),
(1, "2016-07-02 12:21:23"),
(1, "2016-07-02 13:22:56"),
(1, "2016-07-02 13:27:07"),
(0, "2016-07-02 13:30:12"),
(0, "2016-07-02 13:40:34"),
(1, "2016-07-02 13:57:07"),
(1, "2016-07-02 14:08:07")).
toDF("signal", "timestamp").
withColumn("timestamp", F.to_timestamp($"timestamp", "yyyy-MM-dd HH:mm:ss"))
Generally, the idea of creating id for each signal, is to have a column with a 1
if it is a new signal, 0
otherwise.
For this we need to check if the row before has 0 in the column signal
and 1 in the current row.
Let's do a window function with a lag for this.
val newSignalWindow = Window.orderBy("timestamp")
val dfWithNewSignal = df.withColumn("new_signal", (F.lag($"signal", 1, 0).over(newSignalWindow) === 0 && $"signal" === 1).cast("bigint"))
dfWithNewSignal.show
/*
+------+-------------------+----------+
|signal| timestamp|new_signal|
+------+-------------------+----------+
| 0|2016-07-02 12:01:40| 0|
| 1|2016-07-02 12:21:23| 1|
| 1|2016-07-02 13:22:56| 0|
| 1|2016-07-02 13:27:07| 0|
| 0|2016-07-02 13:30:12| 0|
| 0|2016-07-02 13:40:34| 0|
| 1|2016-07-02 13:57:07| 1|
| 1|2016-07-02 14:08:07| 0|
+------+-------------------+----------+
*/
Okay, now we can simply do a filter and a cumulative sum to give a unique id to our signals :
val dfWithIdSignal = dfWithNewSignal.
filter($"signal" === 1).
withColumn("id_signal", F.sum("new_signal").over(newSignalWindow)).
drop("new_signal")
dfWithIdSignal.show
/*
+------+-------------------+---------+
|signal| timestamp|id_signal|
+------+-------------------+---------+
| 1|2016-07-02 12:21:23| 1|
| 1|2016-07-02 13:22:56| 1|
| 1|2016-07-02 13:27:07| 1|
| 1|2016-07-02 13:57:07| 2|
| 1|2016-07-02 14:08:07| 2|
+------+-------------------+---------+
*/
And then, a simple groupBy
and we are done
val resultDf = dfWithIdSignal.groupBy("id_signal").agg(F.min("timestamp").as("start_date"), F.max("timestamp").as("end_date"), F.count("*").as("positive_count")).drop("id_signal")
resultDf.show
Here is our result :
+-------------------+-------------------+--------------+
| start_date| end_date|positive_count|
+-------------------+-------------------+--------------+
|2016-07-02 12:21:23|2016-07-02 13:27:07| 3|
|2016-07-02 13:57:07|2016-07-02 14:08:07| 2|
+-------------------+-------------------+--------------+
Upvotes: 4