Check if all I'm receiving stream properly with all keys

I have the following scenario: suppose there are 20 sensors which are sending me streaming feed. I apply a keyBy (sensorID) against the stream and perform some operations such as average etc. This is implemented, and running well (using Flink Java API).

Initially it's all going well and all the sensors are sending me feed. After a certain time, it may happen that a couple of sensors start misbehaving and I start getting irregular feed from them e.g. I receive feed from 18 sensors,but 2 don't send me feed for long durations.

We can assume that I already know the fixed list of sensorId's (possibly hard-coded / or in a database). How do I identify which two are not sending feed? Where can I get the list of keyId's to compare with the list in database?

I want to raise an alarm if I don't get a feed (e.g 2 mins, 5 mins, 10 mins etc. with increasing priority).

Has anyone implemented such a scenario using flink-streaming / patterns? Any suggestions please.

Upvotes: 0

Views: 183

Answers (3)

The pattern is very clear to me now. I've implemented the solution and it works like charm.

If anyone needs the code, then I'll be happy to share

Upvotes: 0

David Anderson
David Anderson

Reputation: 43717

I just happen to have an example of this pattern lying around. It'll need some adjustment to fit your use case, but should get you started.

public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {

    private ValueState<Long> lastModifiedState;
    static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes

    @Override
    public void open(Configuration parameters) throws Exception {

        // register our state with the state backend
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {

        // update our state and timer
        Long current = lastModifiedState.value();
        if (current != null) {
            ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
        }
        current = max(current, event.timestamp());
        lastModifiedState.update(current);
        ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

        // emit alert
        String deviceId = ctx.getCurrentKey();
        out.collect(deviceId);
    }
}

This assumes a main program that does something like this:

DataStream<String> result = stream
    .assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
    .keyBy(e -> e.deviceId)
    .process(new TimeoutFunction());

As @Dominik said, this only emits alerts for keys that have been seen at least once. You could fix that by introducing a secondary source of events that creates an artificial event for every source that should exist, and union that stream with the primary source.

Upvotes: 0

Dominik Wosiński
Dominik Wosiński

Reputation: 3874

You could technically use the ProcessFunction and timers.

You could simply register timer for each record and reset it if You receive data. If You schedule the timer to run after 5 mins processing time, this would basically mean that If You haven't received the data it would call function onTimer, from which You could simply emit some alert. It would be possible to re-register the timers for already fired alerts to allow emitting alerts with higher severity.

Note that this will only work assuming that initially, all sensors are working correctly. Specifically, it will only emit alerts for keys that have been seen at least once. But from your description it seems that It would solve Your problem.

Upvotes: 0

Related Questions