Reputation: 689
I've been trying to join two streams using CoGroupFunction
in Flink.
I've two streams; which are;
S1
val m = env
.addSource(new FlinkKafkaConsumer010[String]("topic-1", schema, props))
.map(gson.fromJson(_, classOf[Master]))
.assignAscendingTimestamps(_.time)
S2
val d = env
.addSource(new FlinkKafkaConsumer010[String]("topic-2", schema, props))
.map(gson.fromJson(_, classOf[Detail]))
.assignAscendingTimestamps(_.time)
And my coGroup
implementation is;
class MasterDetailOuterJoin extends CoGroupFunction[Master, Detail,
(Master, Option[Detail])] {
override def coGroup(
leftElements : java.lang.Iterable[Master],
rightElements: java.lang.Iterable[Detail],
out: Collector[(Master, Option[Detail]) ]): Unit = {
for (leftElem <- leftElements) {
var isMatch = false
println(leftElem.orderNo)
for (rightElem <- rightElements) {
println(rightElem.orderNo)
out.collect((leftElem, Some(rightElem)))
isMatch = true
}
if (!isMatch) {
out.collect((leftElem, None))
}
}
}
}
And i run it with;
m.coGroup(d)
.where(_.orderNo)
.equalTo(_.orderNo)
.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
.apply(new MasterDetailOuterJoin)
.map(gson.toJson(_, classOf[(Master, Option[Detail])]))
.print
But nothing is printed even there is a match in master and detail! I monitor the kafka streams with console consumer and they are working fine btw.
If i do it with a inner join instead i get the result.
m.keyBy(_.orderNo)
.connect(d.keyBy(_.orderNo))
.flatMap(new MasterDetailInnerJoin) //RichCoFlatMapFunction
.map(gson.toJson(_, classOf[(Master, Detail)]))
.print
Upvotes: 0
Views: 1179
Reputation: 689
It turned out that, what i was missing was;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Upvotes: 3