Reputation: 41
I have data coming in on Kafka from IoT devices. The timestamps of the sensor data of these devices are often not in sync due to network congestion, device being out of range, etc.
We have to write streaming jobs to aggregate sensor values over a window of time for each device independently. With the groupby with watermark operation, we lose the data of all devices that lag behind device with latest timestamp.
Is there any way that the watermarks could be applied separately to each device based on the latest timestamp for that device, and not the latest timestamp across all devices?
We cannot keep a large lag as the device could be out of range for days. We cannot run an individual query for each device as the number of devices is high.
Would it be achievable using flatMapGroupsWithState? Or is this something that cannot be achieved with Spark Structured Streaming at all?
Upvotes: 3
Views: 1497
Reputation: 460
I think instead of watermarking by event timestamp (which could be lagging behind as you said), you could apply a watermark over the processing timestamp (i.e. the time when you process the data in your Spark job). I faced a very similar problem in a recent project I was working on and that's how I solved it.
Example:
val dfWithWatermark = df
.withColumn("processingTimestamp", current_timestamp())
.withWatermark("processingTimestamp", "1 day")
// ... use dfWithWatermark to do aggregations etc
This will keep a state over 1 day of your IoT data, no matter what the timestamp of the data is that you're receiving.
There are some limitations to this of course, for example if there are devices that send data in intervals larger than your watermark. But to figure out a solution for this you'd have to be more specific with your requirements.
By using flatMapGroupsWithState
you can be very specific with your state keeping, but I don't think it's really necessary in your case.
Edit: if you however decide to go with flatMapGroupsWithState
, you can use different timeouts per device group by calling state.setTimeoutDuration()
with different intervals, depending on the type of device you process. This way you can be very specific with the state keeping.
Upvotes: 3