heaplay
heaplay

Reputation: 15

Apache Flink - calculate the difference of value between two consecutive event with event time

I have some energy meters that will keep producing counter value which is a cumulative metric . i.e. Keep increasing until counter reset.

Key                Value
----------------------------------------------------------------------
Sensor1            {timestamp: "10-10-2019 10:20:30", Kwh: 10}
Sensor1            {timestamp: "10-10-2019 10:20:40", Kwh: 21}
Sensor1            {timestamp: "10-10-2019 10:20:55", Kwh: 25}
Sensor1            {timestamp: "10-10-2019 10:21:05", Kwh: 37}
Sensor1            {timestamp: "10-10-2019 10:21:08", Kwh: 43}
.
.
.

There is a real-time ETL job which to do subtraction between two consecutive values in event time.

e.g.

10-10-2019 10:20:30  = 21 - 10 = 11
10-10-2019 10:20:40  = 25 - 21 = 4
10-10-2019 10:20:55  = 37 - 25 = 12
. 
.
.

Moreover, sometimes the event may not be received in order.

How can I achieve by using Apache Flink Streaming API? Better with example in Java.

Upvotes: 0

Views: 1064

Answers (1)

David Anderson
David Anderson

Reputation: 43697

In general, when faced with the requirement to process an out-of-order stream in order, the easiest (and performant) way to handle this is to use Flink SQL, and rely on it to do the sorting. Note that it will rely on the WatermarkStrategy to determine when events can safely be considered ready to be emitted, and will drop any late events. If you must know about the late events, then I would recommend using CEP rather than SQL with MATCH_RECOGNIZE (as shown below).

For more about using Watermarks for sorting, see the tutorial about Watermarks in the Flink docs.

Here's an example of how to implement your use case using Flink SQL:

public class SortAndDiff {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<Tuple3<String, Long, Long>> input = env.fromElements(
                new Tuple3<>("sensor1", "2019-10-10 10:20:30", 10L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:40", 21L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:10", 28L),
                new Tuple3<>("sensor2", "2019-10-10 10:20:05", 20L),
                new Tuple3<>("sensor1", "2019-10-10 10:20:55", 25L),
                new Tuple3<>("sensor1", "2019-10-10 10:21:05", 37L),
                new Tuple3<>("sensor2", "2019-10-10 10:23:00", 30L))
        .map(new MapFunction<Tuple3<String, String, Long>, Tuple3<String, Long, Long>>() {
            @Override
            public Tuple3<String, Long, Long> map(Tuple3<String, String, Long> t) throws Exception {
                return new Tuple3<>(t.f0, Timestamp.valueOf(t.f1).toInstant().toEpochMilli(), t.f2);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                    .withTimestampAssigner((event, timestamp) -> event.f1));

        Table events = tableEnv.fromDataStream(input,
                $("sensorId"),
                $("ts").rowtime(),
                $("kwh"));

        Table results = tableEnv.sqlQuery(
                "SELECT E.* " +
                    "FROM " + events + " " +
                    "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sensorId " +
                        "ORDER BY ts " +
                        "MEASURES " +
                            "this_step.ts AS ts, " +
                            "next_step.kwh - this_step.kwh AS diff " +
                        "AFTER MATCH SKIP TO NEXT ROW " +
                        "PATTERN (this_step next_step) " +
                        "DEFINE " +
                            "this_step AS TRUE, " +
                            "next_step AS TRUE " +
                    ") AS E"
        );


        tableEnv
                .toAppendStream(results, Row.class)
                .print();

        env.execute();
    }

}

Upvotes: 3

Related Questions