Reputation: 529
I have a stream of data coming from IoT devices that have an id (uuid) and a quantity (ie. temperature).
I'd like to keep a count of events received in the last 15 minutes, with a sliding window of, say, 1 or 5 minutes.
I have implemented the following in Spark but it generates all windows, but I'm interested only in the most recent one (and maybe zero if the device has not sent any data in the meantime):
import org.apache.spark.sql.functions._
val agg15min = stream
.withWatermark("createdAtTimestamp", "15 minutes")
.where("device_uuid is not null")
.groupBy($"device_uuid", window($"createdAtTimestamp", "15 minutes", "5 minutes"))
.count()
I have tried filtering the data afterwards like this:
val query15min =
agg15min
.writeStream
.format("memory")
.queryName("query15min")
.outputMode("complete")
.start()
and then:
val df15min = spark.sql("""
with cte as (
select
device_uuid,
date_format(window.end, "MMM-dd HH:mm") as time,
rank() over (partition by device_uuid order by window.end desc) as rank,
count
from query15min
)
select
device_uuid,
count
from cte
where rank = 1""")
But the documentation says memory
is not for production use, and also it is rather inefficient.
Is there an efficient way to implement this kind of logic in Spark Structured Streaming?
Upvotes: 0
Views: 2323
Reputation: 34
Workaround:
val query15min = agg15min
.where("STRING((int(unix_timestamp()/300)*300 - int(window.start)) / (15 * 60))) == '1.0'")
.writeStream()...
or
val query15min = agg15min
.where("(int(unix_timestamp()/300)*300) == int(window.end)")
.writeStream()...
Upvotes: 0
Reputation: 772
Yes, the memory option shouldn't be used as it is meant for debugging mode. memory option also brings all the data to the driver node of Spark. The efficient way here would be to store the output (Writestream) into the HDFS path as a file (parquet etc). Use this path to read the parquet file in a spark session and run the query at regular intervals.
Upvotes: 1