Reputation: 428
I have two streams. They are both aggregated data over 1 hour windows. I want to zip these streams so aggregations over the same timespan are tupled together, possibly with an empty value if now such correstponding match exists.
DataStream<OneHourAggA> one =
sourceA
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataSteam<OneHourAggB> other =
sourceB
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataStream<Tuple2<Option<OneHourAggA>,Option<OneHourAggB>> zipped =
sourceA.???(sourceB)
How can i achieve this?
Upvotes: 4
Views: 420
Reputation: 13346
You would have to use a coGroup
operation to perform the outer join of the aggregation results. You would use the same time window specification for the coGroup operation. This works because the aggregation result of the preceding window will only generate one element per window and this element will get the maximum timestamp of this window assigned.
Upvotes: 1