Reputation: 407
When I try to aggregate elements using window and fold function, some
of the elements are missed from getting aggregated. Consuming elements
from Kafka (value:0, value:1, value:2, value:3)
and aggregating them
as odd and even values.
Output is:
{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}
Numbers between 10-13 is missing and this happens for a random set of numbers. Can someone suggest what is missed from the code below and how can I be sure to process all elements?
public static class Splitter implements FlatMapFunction<String,
Tuple3<String, String, List<String>>{
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple3<String, String,
List<String>>out) throws Exception {
String[] vals = value.split(":");
if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
out.collect(new Tuple3<String, String, List<String>>
("test","even", Arrays.asList(vals[1])));
}else{
out.collect(new Tuple3<String, String, List<String>>
("test","odd", Arrays.asList(vals[1])));
}
}
}
DataStream<Map<String, List<String>>streamValue =
kafkaStream.flatMap(new Splitter()).keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
.fold(new HashMap<String, List<String>>(), new
FoldFunction<Tuple3<String, String, List<String>>, Map<String,
List<String>>>() {
private static final long serialVersionUID = 1L;
@Override
public Map<String, List<String>fold(Map<String,
List<String>accumulator,
Tuple3<String, String, List<String>value) throws
Exception {
if(accumulator.get(value.f1) != null){
List<Stringlist = new ArrayList<>();
list.addAll(accumulator.get(value.f1));
list.addAll(value.f2);
accumulator.put(value.f1, list);
}else{
accumulator.put(value.f1, value.f2);
}
return accumulator;
}
});
streamValue.print();
env.execute("window test");
}
public class CustomizedCountTrigger<W extends Windowextends
Trigger<Object, W{
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<LongstateDesc =
new ReducingStateDescriptor<>("count", new Sum(),
LongSerializer.INSTANCE);
private CustomizedCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
public static <W extends WindowCustomizedCountTrigger<Wof(long
maxCount) {
return new CustomizedCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long{
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
Upvotes: 1
Views: 852
Reputation: 1104
So I started writing the first part of this before noticing your custom triggers make the fact that you are using a TumblingEventTime window sort of irrelevant, but I want to include my original thoughts anyway since I am not fully sure why you would use an EventTime window when you aren't using it. My response after realizing this is below the original.
Are you running this on a single parallelism or multiple? The reason why I ask is because if it is multiple parallelism (and the kafka topic is also comprised of multiple partitions), then it is possible that messages are being received and processed in a non sequential order. This could lead to messages with a timestamp that causes the watermark to advance, causing the window to process the messages. Then the next message(s) has an event time that is before the current watermark time (a.k.a being "late") and that will cause the message to be dropped.
So for example: if you have 20 elements and the event time of each is like such:
message1: eventTime: 1000 message1: eventTime: 2000 etc...
And your event-time window is 5001ms.
Now messages message1 through message9 come through in order. This first window will be processed and contain messages 1-5 (message6 will have caused the window to be processed). Now if message11 comes in before message10, it will cause the window to be processed containing messages 6-9. And when message10 comes next, the watermark has already advanced past message10's event time, causing it to be dropped as a "late event".
Proper Answer
Instead of using an eventTime window and a custom trigger, try using a countWindow.
So replace this:
.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
With this:
.countWindow(5L)
Upvotes: 1