Reputation: 582
I have the following pipeline
pipeline
.apply("read", ... )).setCoder(keyValue)
.apply("window", toWindow)
.apply("modify-key",
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((SerializableFunction<KV<String, MetricsEvent>, KV<String, Integer>>)
kv -> KV.of(kv.getService(), kv.getValue().getSensors().size())
))
.apply("combine-per-key", Combine.perKey(new CombineValues()))
.apply("write", ConsoleOutput.of(kv ->
"####### "
+ LocalDateTime.now()
+ " ServiceName: " + kv.getKey()
+ " Metrics: " + kv.getValue()
+ " #######"
));
pipeline.run();
the window definition is as follows
final Duration fixedWindowDuration = Duration.standardSeconds(300);
final Duration alignedDuration = Duration.standardSeconds(300);
final Duration delayWindow = Duration.standardSeconds(10);
final int maxElements = 200;
final Window<KV<String, ServiceMetricsEvent>> toWindow =
Window.<KV<String, ServiceMetricsEvent>>into(FixedWindows.of(fixedWindowDuration))
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().alignedTo(alignedDuration)
.plusDelayOf(delayWindow)
).withLateFirings(AfterPane.elementCountAtLeast(maxElements)))
.withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes();
and the combine function is as follows
private static final class CombineValues extends Combine.CombineFn<
Integer,
List<Integer>,
Integer> {
@Override
public List<Integer> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<Integer> addInput(List<Integer> accumulator, Integer input) {
accumulator.add(input);
return accumulator;
}
@Override
public List<Integer> mergeAccumulators(Iterable<List<Integer>> accumulators) {
List<Integer> merged = new ArrayList<>();
for (List<Integer> acc : accumulators) {
merged.addAll(acc);
}
return merged;
}
@Override
public Integer extractOutput(List<Integer> accumulator) {
int sum = 0;
for (int value : accumulator) {
sum += value;
}
return sum;
}
}
the pipeline and the combine are working fine during the first windows, I get this when I grep the log file
####### 2023-02-05T11:00:10.109 ServiceName: app1 Metrics: 396055 #######
####### 2023-02-05T11:00:10.138 ServiceName: app2 Metrics: 29207 #######
####### 2023-02-05T11:00:10.139 ServiceName: app3 Metrics: 109534 #######
but after a couple of cycles, more the same key starts appearing increasingly with each window
####### 2023-02-05T11:25:10.069 ServiceName: app1 Metrics: 1348109 #######
####### 2023-02-05T11:25:10.097 ServiceName: app2 Metrics: 85744 #######
####### 2023-02-05T11:25:10.098 ServiceName: app3 Metrics: 13439 #######
####### 2023-02-05T11:25:10.099 ServiceName: app4 Metrics: 279245 #######
####### 2023-02-05T11:30:10.051 ServiceName: app1 Metrics: 1953436 #######
####### 2023-02-05T11:30:10.058 ServiceName: app2 Metrics: 92101 #######
####### 2023-02-05T11:30:10.059 ServiceName: app3 Metrics: 67226 #######
####### 2023-02-05T11:30:10.060 ServiceName: app4 Metrics: 314705 #######
####### 2023-02-05T11:35:10.030 ServiceName: app1 Metrics: 153819 #######
####### 2023-02-05T11:35:10.035 ServiceName: app1 Metrics: 1706140 #######
####### 2023-02-05T11:35:10.037 ServiceName: app2 Metrics: 14598 #######
####### 2023-02-05T11:35:10.038 ServiceName: app2 Metrics: 117421 #######
####### 2023-02-05T11:35:10.039 ServiceName: app3 Metrics: 85252 #######
####### 2023-02-05T11:35:10.039 ServiceName: app4 Metrics: 33790 #######
####### 2023-02-05T11:35:10.040 ServiceName: app4 Metrics: 321070 #######
every cycle the key starts to appear one more time, I know that the windows can sometimes overlap or that the windows can be fired more than once and this could be producing more groups with the same key but in case the keys start accumulating
Upvotes: 0
Views: 152
Reputation: 582
the problem was the window, the combine.perkey will output a value per window*perkey
final Duration fixedWindowDuration = Duration.standardSeconds(300);
final Window<KV<String, ServiceMetricsEvent>> toWindow =
Window.<KV<String, ServiceMetricsEvent>>into(new GlobalWindows())
.triggering(
AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(fixedWindowDuration)))
.withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes();
By making the window to fire every x seconds for a single global window now I have a single value per key
####### 2023-02-05T17:23:07.729 ServiceName: app1 Metrics: 1924332 #######
####### 2023-02-05T17:23:18.722 ServiceName: app2 Metrics: 76192 #######
####### 2023-02-05T17:23:18.926 ServiceName: app3 Metrics: 103526 #######
####### 2023-02-05T17:23:23.376 ServiceName: app4 Metrics: 361210 #######
####### 2023-02-05T17:28:09.176 ServiceName: app1 Metrics: 1932797 #######
####### 2023-02-05T17:28:19.695 ServiceName: app2 Metrics: 129393 #######
####### 2023-02-05T17:28:21.738 ServiceName: app3 Metrics: 53782 #######
####### 2023-02-05T17:28:33.449 ServiceName: app4 Metrics: 356302 #######
####### 2023-02-05T17:33:09.550 ServiceName: app1 Metrics: 1992730 #######
####### 2023-02-05T17:33:37.975 ServiceName: app2 Metrics: 352540 #######
####### 2023-02-05T17:33:37.980 ServiceName: app3 Metrics: 62694 #######
####### 2023-02-05T17:33:54.220 ServiceName: app4 Metrics: 78748 #######
Upvotes: 0