Reputation: 21
I'm going through Flink tutorial materials from dataArtisans and for some reason when I get to the sample file PopularPlacesFromKafka.scala I don't get any output sent to stdout.
...
// find popular places
val popularSpots = rides
// match ride to grid cell and event type (start or end)
.map(new GridCellMatcher)
// partition by cell id and event type
.keyBy( k => k )
// build sliding window
.timeWindow(Time.minutes(15), Time.minutes(5))
// count events in window
.apply{ (key: (Int, Boolean), window, vals, out: Collector[(Int, Long, Boolean, Int)]) =>
out.collect( (key._1, window.getEnd, key._2, vals.size) )
}
// print result on stdout
popularSpots.print()
...
I've confirmed that data is being pulled from Kafka ok, and it seems to be something when it attempts to do the 'timeWindow' operation that I get no output. If I remove the 'timeWindow' operation I can see the 'keyBy' data being output. Is there something obvious that I'm missing?
Upvotes: 1
Views: 1068
Reputation: 21
In case anyone has this same issue this was my problem.
My kafka topic had multiple partitions, but was producing all of the test data to a single partition (0), once I had >1 Kafka consumers, all of the consumers except for the one assigned to partition 0 aren't receiving any data, and thus not sending any watermarks down the operator chain - that causes the window functions to stop emitting data (that's also why it works fine with ProcessingTime in those situations). Here's a relevant JIRA about it:
https://issues.apache.org/jira/browse/FLINK-5479
Upvotes: 1
Reputation: 43499
In general there are several reasons why a Flink job might not produce any output, but a very common reason has to do with watermarking. Flink's event time clock only advances when the current watermark advances, and so without watermarks, a event time window will never fire.
In the case of the Flink training exercises, the taxi ride sources produce watermarks for you. But now that you are working with a Kafka source instead, you will have to implement a timestamp extractor and watermark generator, and then call assignTimestampsAndWatermarks
on your stream (see the documentation). A BoundedOutOfOrdernessTimestampExtractor
where the delay matches the delay configured by the job that wrote into Kafka will work well.
Upvotes: 0
Reputation: 18987
Did you configure an appropriate speedup for the source? By default (without a speedup factor), the source emulates the original data, i.e., it emits records at the same rate as they were originally generated. That means it takes 1 minute to produce 1 minute of data.
The window operator aggregates every 5 minutes the last 15 minutes of data. Consequently, it will take 5 minutes until the window operator produces the first result.
If you set the speedup factor to 600, you'll get 10 minutes of data in 1 second.
Upvotes: 1