Atle
Atle

Reputation: 428

Using Apache Flink, how du i join zip two streams on time?

I have two streams. They are both aggregated data over 1 hour windows. I want to zip these streams so aggregations over the same timespan are tupled together, possibly with an empty value if now such correstponding match exists.

DataStream<OneHourAggA> one = 
    sourceA
      .keyBy(d -> (String) d.values.get("m"))
      .timeWindow(Time.hours(1))
      .apply(new WorkWindwFolder());

DataSteam<OneHourAggB> other = 
     sourceB
       .keyBy(d -> (String) d.values.get("m"))
       .timeWindow(Time.hours(1))
       .apply(new WorkWindwFolder());

DataStream<Tuple2<Option<OneHourAggA>,Option<OneHourAggB>> zipped = 
     sourceA.???(sourceB)

How can i achieve this?

Upvotes: 4

Views: 420

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

You would have to use a coGroup operation to perform the outer join of the aggregation results. You would use the same time window specification for the coGroup operation. This works because the aggregation result of the preceding window will only generate one element per window and this element will get the maximum timestamp of this window assigned.

Upvotes: 1

Related Questions