Reputation: 149
I have a DataStream of Kafka events (corresponding to readings from a device) fed into the following code which produces a per device average of the reading in a sliding window. This works fine. Next i want to calculate the sum of all the per device average in the same window, this is the part i am not able to syntactically express correctly.
This part works:
val stream = env
// words is our Kafka topic
.addSource(kafkaConsumer)
// configure timestamp and watermark assigner
.assignTimestampsAndWatermarks(new DeviceTSAssigner)
.keyBy(_.deviceIdFull)
.timeWindow(Time.minutes(5), Time.minutes(1))
/* count events in window */
.apply{ (key: String, window: TimeWindow, events: Iterable[DeviceData], out: Collector[(String, Long, Double)]) =>
out.collect( (key, window.getEnd, events.map(_.currentReading).sum/events.size))
}
stream.print()
The output is something like
(device1,1530681420000,0.0)
(device2,1530681420000,0.0)
(device3,1530681480000,0.0)
(device4,1530681480000,0.0)
(device5,1530681480000,52066.0)
(device6,1530681480000,69039.0)
(device7,1530681480000,79939.0)
...
...
The following code is the part i am having problems with, i am not sure exactly how to code this but i am thinking it should be something like this:
val avgStream = stream
.keyBy(2) // 2 represents the window.End from stream, see code above
.timeWindow(Time.minutes(1)) // tumbling window
.apply { (
key: Long,
window: TimeWindow,
events: Iterable[(String, Long, Double)],
out: Collector[(Long, Double)]) =>
out.collect( (key, events.map( _._3 ).sum ))
}
I get the following errors while compiling this code..
Error:(70, 52) type mismatch;
found : (Long, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[(Long, Double)]) => Unit
required: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(String, Long, Double)], org.apache.flink.util.Collector[?]) => Unit
out: Collector[(Long, Double)]) =>
I tried other variants as well like use AggregtionFunctions, but could not get past compilation. Seems from the error that i need to convert the input stream elements into tuples, i have seen some code but not exactly sure how to do that. I am completely new at Scala so i am thinking that is the main problem here because what i want to do is not complex.
Updated 07/04/2018
I think i have a solution for my question, seems to work ok, but i still would like to keep this open hoping someone else can comment on it (the question as well as my solution).
Basically, i removed the first field (by doing a map) which was the name of the device since we dont need it and then a keyBy on the timestamp (from the previous stage), windowed events into a tumbling window and then just sum on the 2nd (index 1, 0 based) field which is the average reading from the previous stage.
val avgStream = stream
.map(r => (r._2, r._3))
.keyBy(0)
.timeWindowAll(Time.minutes(1))
.sum(1)
.print()
Upvotes: 1
Views: 1152
Reputation: 149
I was able to answer my own question, so the approach described above (see under update 07/04/2018) works but the better way to do this (especially if you want to do this for not just one field in the stream but multiple) is to use an AggregateFunction. I was trying that earlier as well but ran into problems because the "map" step was missing.
Once i mapped the stream in the second stage to extract out relevant fields of interest i could use the AggregateFunction.
The Flink documentation here and this github link both provide an example for this. I started with the Flink documentation example because it was very simple to understand and then converted my code to look more like the github example.
Upvotes: 1