Reputation: 18754
I have a stream execution configured as
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Record> stream = env.addSource(new FlinkKafkaConsumer(
SystemsCpu.TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(SystemsCpu.SCHEMA, registry),
config)
.setStartFromLatest());
DataStream<Anomaly> anomalies = stream
.keyBy(x -> x.get("host").toString())
.window(SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))) // produces output with TumblingEventTimeWindows
.process(new AnomalyDetector())
.name("anomaly-detector");
public class AnomalyDetector extends ProcessWindowFunction<Record, Anomaly, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Record> input, Collector<Anomaly> out) {
var anomaly = new Anomaly();
anomaly.setValue(1.0);
out.collect(anomaly);
}
}
However for some reason SlidingEventTimeWindows
does not produce any output to be processed by the AnomalyDetector
(i.e. process is not triggered at all). If I use, for example, TumblingEventTimeWindows
it works as expected.
Any ideas what might be causing this? Am I using SlidingEventTimeWindows
incorrectly?
Upvotes: 1
Views: 417
Reputation: 43439
When doing any sort of event time windowing it is necessary to provide a WatermarkStrategy
. Watermarks mark a spot in the stream, and signal that the stream is complete up through some specific point in time. Event time windows can only be triggered by the arrival of a sufficiently large watermark.
See the docs for details, but this could be something like this:
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.timestamp));
However, since you are using Kafka, it's usually better to have the Flink Kafka consumer do the watermarking:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy...);
DataStream<MyType> stream = env.addSource(kafkaSource);
Note that if you use this later approach, and if your events are in temporal order within each Kafka partition, you can take advantage of the per-parition watermarking that the Flink Kafka source provides, and use WatermarkStrategy.forMonotonousTimestamps()
rather than the bounded-of-orderness strategy. This has a number of advantages.
By the way, and this is unrelated to your question, but you should be aware that by specifying SlidingEventTimeWindows.of(Time.minutes(20), Time.seconds(20))
, every event will be copied into each of 60 overlapping windows.
Upvotes: 3
Reputation: 1015
You are using SlidingEventTimeWindows
but your stream execution environment is configured for processing time by default. Either use SlidingProcessingTimeWindows
or configure your environment for event time like so
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Event time will also require a special time stamp assigner, you can find more info here.
https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink?hs_amp=true
Upvotes: 1