Reputation: 1
I'm getting streaming sensor data from Kafka, and I need to do the following:
a. Check a variable's variations within a time period, and if extreme raise an alarm (e.g. temperature variations over a 5 min period from very low to very high). I check if I get a very low temp, set a timer of 5 mins and observe if I get a very high temp within 5 mins, and if so I raise an alarm
b. Alternatively, if I get a high temp, then I check to see if I get a very low temp in the 5-min window
c. compute the running average of temperature every 1-min and push to kafka. This is a continuous activity and I need a timer from the start running every 1-min on an incremental basis
I'm not able to understand how to make use of the timer for all these use-cases in the same code. Any suggestions / advice.
Upvotes: 0
Views: 52
Reputation: 43707
Flink's tumbling time windows are a good match for case c, but aren't such a good match for cases a and b. This is because these windows can not be aligned to a triggering event -- they are instead always aligned to the clock (e.g., from 12:00 to 12:05). So if a high temperature event occurs at 12:04, and then drops a lot by 12:06, those two events will be in different windows.
For a and b I would instead suggest using an interval join from the DataStream API, or a time-windowed join using the Table or SQL API. Something like
SELECT *
FROM events e1, events e2
WHERE e1.id = e2.id AND
e2.time BETWEEN e1.time AND e1.time + INTERVAL '5' MINUTE AND
ABS(e1.temp - e2.temp) > 50
If you really must unify these three cases together, then I would suggest using a KeyedProcessFunction
-- but that's going to be rather more difficult, especially if you need to worry about the events arriving out-of-order. And you'll need more than one timer, since the one-minute intervals are not in sync with the 5-minute intervals.
Upvotes: 1
Reputation: 505
So Flink has concept of Windows Read More Here.
There is also a concept of keyed & non-keyed Streams which will also help you if you want to segregate data before you process Read more Here.
TumblingEventTimeWindows
of 5 minutes interval and write your custom logic in process
method. So for every 5 minutes you process the elements together which is what you ultimately want to do I believe.TumblingEventTimeWindows
of 1 minute and calculate average temperature using reduce
methodBelow code assumes that the stream is non-keyed
with ProcessingTime
TimeCharacteristic.
// By Default TimeCharacteristic is ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// for a & b
data
.windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
for(String elements: iterable){
// custom logic here
....
// collect your result
collector.collect(elements);
}
}
})
.addSink(Push it to whatever Sink);
// For c
data
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<String>() {
@Override
public String reduce(String s, String t1) throws Exception {
// Calculate Avg
return avg;
}
})
// Add to Kafka
.addSink(new FlinkKafkaProducer<>(topic,new SimpleStringSchema,kafkaProperties);
Upvotes: 0