Running multiple use-cases on Streaming Data

I'm getting streaming sensor data from Kafka, and I need to do the following:

a. Check a variable's variations within a time period, and if extreme raise an alarm (e.g. temperature variations over a 5 min period from very low to very high). I check if I get a very low temp, set a timer of 5 mins and observe if I get a very high temp within 5 mins, and if so I raise an alarm

b. Alternatively, if I get a high temp, then I check to see if I get a very low temp in the 5-min window

c. compute the running average of temperature every 1-min and push to kafka. This is a continuous activity and I need a timer from the start running every 1-min on an incremental basis

I'm not able to understand how to make use of the timer for all these use-cases in the same code. Any suggestions / advice.

Upvotes: 0

Views: 52

Answers (2)

David Anderson
David Anderson

Reputation: 43707

Flink's tumbling time windows are a good match for case c, but aren't such a good match for cases a and b. This is because these windows can not be aligned to a triggering event -- they are instead always aligned to the clock (e.g., from 12:00 to 12:05). So if a high temperature event occurs at 12:04, and then drops a lot by 12:06, those two events will be in different windows.

For a and b I would instead suggest using an interval join from the DataStream API, or a time-windowed join using the Table or SQL API. Something like

SELECT *
FROM events e1, events e2
WHERE e1.id = e2.id AND
      e2.time BETWEEN e1.time AND e1.time + INTERVAL '5' MINUTE AND
      ABS(e1.temp - e2.temp) > 50

If you really must unify these three cases together, then I would suggest using a KeyedProcessFunction -- but that's going to be rather more difficult, especially if you need to worry about the events arriving out-of-order. And you'll need more than one timer, since the one-minute intervals are not in sync with the 5-minute intervals.

Upvotes: 1

Shreyas B
Shreyas B

Reputation: 505

So Flink has concept of Windows Read More Here.

There is also a concept of keyed & non-keyed Streams which will also help you if you want to segregate data before you process Read more Here.

  • For a & b -> I suggest you use a simple TumblingEventTimeWindows of 5 minutes interval and write your custom logic in process method. So for every 5 minutes you process the elements together which is what you ultimately want to do I believe.
  • For c -> I suggest again to use simple TumblingEventTimeWindows of 1 minute and calculate average temperature using reduce method

Below code assumes that the stream is non-keyed with ProcessingTime TimeCharacteristic.


        // By Default TimeCharacteristic is ProcessingTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // for a & b 
        data
                .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
                        for(String elements: iterable){
                            // custom logic here

                                ....
                            // collect your result
                            collector.collect(elements);
                        }
                    }
                })
                .addSink(Push it to whatever Sink);

        // For c
        data
                .windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
                .reduce(new ReduceFunction<String>() {
                    @Override
                    public String reduce(String s, String t1) throws Exception {
                        // Calculate Avg
                        return avg;
                    }
                })
                // Add to Kafka
                .addSink(new FlinkKafkaProducer<>(topic,new SimpleStringSchema,kafkaProperties);






Upvotes: 0

Related Questions