Reputation: 511
I have a very simple streaming pipeline setup in apache flink, the pipeline works and I was able to apply a processFunction to the input data stream like this:
DataStream<MeasurementData> data = env.addSource(consumer);
DataStream<MeasurementData> dataProcessed =data.process(new FFT());
dataProcessed.print();
dataProcessed.addSink(new FlinkKafkaProducer011<>(
"localhost:9092", // Kafka broker host:port
OUTPUT_TOPIC, // Topic to write to
new MeasurementDataSchema()) // Serializer
);
Now I'd like to apply a ProcessWindowFunction operating on a windows of a certain time, instead of applying the function for each incoming datapoint. I tried it like this:
DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
.process(new MyProcessWindowFunction());
And the definition of MyProcessWindowFunction():
public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {
public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
long count = 0;
for (MeasurementData data : input) {
for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
}
count++;
out.collect(data);
}
}
}
But this function seems to never get invoked. I tried placing print statements in there and also stepped trough the whole program with a debugger. Is there anything I'm missing? Any hint is appreciated.
Upvotes: 0
Views: 535
Reputation: 511
Found the problem: The environment was set to use EventTime instead of processingTime, while my data does not contain any event timestamps.
Upvotes: 1