Reputation: 65
I have the below JSON data coming from Rabbit MQ
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}}
I am trying to get duration for each RunStatus that the device was in , so for the above data say for Device - MACH-101 , the RunStatus would look like this
In Runstatus 1 the device was in for - 5 seconds (30 - 35) In Runstatus 2 the device was in for - 5 seconds (40 - 45) In Runstatus 3 the device was in for - 10 seconds (35 - 40 + 45 - 50)
The same above logic would be applicable for the second device data as well.
Below is the Apache Spark SQL query I am attempting, but I am not getting the desired result. Please suggest some alternatives; I don't mind doing it in a non-SQL fashion also.
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local[*]");
jssc = new JavaStreamingContext(mconf,Durations.seconds(10));
SparkSession spksess = SparkSession
.builder()
.master("local[*]")
.appName("RabbitMqReceiver2")
.getOrCreate();
SQLContext sqlctxt = new SQLContext(spksess);
JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10));
sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() {
@Override public String call(String argdt1,String argdt2) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss");
DateTime dt1 = formatter.parseDateTime(argdt1);
DateTime dt2 = formatter.parseDateTime(argdt2);
Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1);
return retsec.toString();
}
},DataTypes.StringType);
machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> df = sqlctxt.jsonRDD(rdd);
df.createOrReplaceTempView("DeviceData");
// I DONT WANT to GROUP by timestamp, but query requires I pass it.
Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus,"
+ " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1"
+ " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp");
searchResult.show();
}
}
});
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Sample result from the above code / sql execution is as below
+--------+---------+--------+
|DeviceId|runstatus|duration|
+--------+---------+--------+
| NTC-167| 2| PT0S|
| NTC-168| 2| PT0S|
| NTC-168| 2| PT-10S|
| NTC-168| 2| PT-15S|
| NTC-168| 1| PT10S|
| NTC-168| 1| PT0S|
| NTC-168| 1| PT-5S|
| NTC-168| 1| PT15S|
| NTC-168| 1| PT5S|
| NTC-168| 1| PT0S|
+--------+---------+--------+
So you can see that statuses are repeating and out of the repeated rows one of them has the correct result. The query that I have written is forcing me to group by timestamp also , i guess if i can avoid the grouping by timestamp results might be correct ... not sure on this.
Upvotes: 0
Views: 793
Reputation: 72
You can try with Dataframe and Window function. Using the "lead" in the Window function, you can compare current row timestamp with the next row timestamp and find the differences for each device and run status. Like below,
val windowSpec_wk = Window.partitionBy(df1("DeviceID")).orderBy(df1("timestamp"))
val df2 = df1.withColumn("period", lead(df1("timestamp"), 1).over(windowSpec_wk))
Upvotes: 1