Reputation: 75
We have a Flink job that does intervalJoin two streams, both streams consume events from Kafka. Here is the example code
val articleEventStream: DataStream[ArticleEvent] = env.addSource(articleEventSource)
.assignTimestampsAndWatermarks(new ArticleEventAssigner)
val feedbackEventStream: DataStream[FeedbackEvent] = env.addSource(feedbackEventSource)
.assignTimestampsAndWatermarks(new FeedbackEventAssigner)
articleEventStream
.keyBy(article => article.id)
.intervalJoin(feedbackEventStream.keyBy(feedback => feedback.article.id))
.between(Time.seconds(-5), Time.seconds(10))
.process(new ProcessJoinFunction[ArticleEvent, FeedbackEvent, String] {
override def processElement(left: ArticleEvent, right: FeedbackEvent, ctx: ProcessJoinFunction[ArticleEvent, FeedbackEvent, String]#Context, out: Collector[String]): Unit = {
out.collect(left.name + " got feedback: " + right.feedback);
}
});
});
class ArticleEventAssigner extends AssignerWithPunctuatedWatermarks[ArticleEvent] {
val bound: Long = 5 * 1000
override def checkAndGetNextWatermark(lastElement: ArticleEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - bound)
}
override def extractTimestamp(element: ArticleEvent, previousElementTimestamp: Long): Long = {
element.occurredAt
}
}
class FeedbackEventAssigner extends AssignerWithPunctuatedWatermarks[FeedbackEvent] {
val bound: Long = 5 * 1000
override def checkAndGetNextWatermark(lastElement: FeedbackEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - bound)
}
override def extractTimestamp(element: FeedbackEvent, previousElementTimestamp: Long): Long = {
element.occurredAt
}
}
However, we do not see any joined output. We checked that each stream does continuously emit elements with timestamp and proper watermark. Does anyone have any hint what could be possible reasons?
Upvotes: 1
Views: 620
Reputation: 75
After checking different parts (timestamp/watermark, triggers), I just noticed that I made a mistake, i.e., the window size I used
between(Time.seconds(-5), Time.seconds(10))
is just too small, which could not find elements from both streams to join. This might sound obvious, but since I am new to Flink, I did not know where to check. So, my lesson is that if the join does not output, it could be necessary to check the window size. And thanks all for the comments!
Upvotes: 1