Amit Arora
Amit Arora

Reputation: 149

Apache Flink, second stage summing on a windowed stream

I have a DataStream of Kafka events (corresponding to readings from a device) fed into the following code which produces a per device average of the reading in a sliding window. This works fine. Next i want to calculate the sum of all the per device average in the same window, this is the part i am not able to syntactically express correctly.

This part works:

val stream = env
    // words is our Kafka topic
    .addSource(kafkaConsumer)
    // configure timestamp and watermark assigner
    .assignTimestampsAndWatermarks(new DeviceTSAssigner)
      .keyBy(_.deviceIdFull)
      .timeWindow(Time.minutes(5), Time.minutes(1))
    /* count events in window */
      .apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
        out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
    }

  stream.print()

The output is something like

(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
... 
...

The following code is the part i am having problems with, i am not sure exactly how to code this but i am thinking it should be something like this:

  val avgStream = stream
    .keyBy(2) // 2 represents the window.End from stream, see code above
    .timeWindow(Time.minutes(1)) // tumbling window
    .apply { (
               key: Long,
               window: TimeWindow,
               events: Iterable[(String, Long, Double)],
               out: Collector[(Long, Double)]) =>
      out.collect( (key, events.map( _._3 ).sum ))
    }

I get the following errors while compiling this code..

Error:(70, 52) type mismatch;
 found   : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
 required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
                   out: Collector[(Long, Double)]) =>

I tried other variants as well like use AggregtionFunctions, but could not get past compilation. Seems from the error that i need to convert the input stream elements into tuples, i have seen some code but not exactly sure how to do that. I am completely new at Scala so i am thinking that is the main problem here because what i want to do is not complex.

Updated 07/04/2018

I think i have a solution for my question, seems to work ok, but i still would like to keep this open hoping someone else can comment on it (the question as well as my solution).

Basically, i removed the first field (by doing a map) which was the name of the device since we dont need it and then a keyBy on the timestamp (from the previous stage), windowed events into a tumbling window and then just sum on the 2nd (index 1, 0 based) field which is the average reading from the previous stage.

val avgStream = stream
          .map(r => (r._2, r._3))
         .keyBy(0)
         .timeWindowAll(Time.minutes(1))
         .sum(1)
         .print()

Upvotes: 1

Views: 1152

Answers (1)

Amit Arora
Amit Arora

Reputation: 149

I was able to answer my own question, so the approach described above (see under update 07/04/2018) works but the better way to do this (especially if you want to do this for not just one field in the stream but multiple) is to use an AggregateFunction. I was trying that earlier as well but ran into problems because the "map" step was missing.

Once i mapped the stream in the second stage to extract out relevant fields of interest i could use the AggregateFunction.

The Flink documentation here and this github link both provide an example for this. I started with the Flink documentation example because it was very simple to understand and then converted my code to look more like the github example.

Upvotes: 1

Related Questions