Reputation: 21
We are using aggregators to keep a count for exceptions during processing
public class BigTableWriter extends DoFn<String, Void> {
private Aggregator<Integer, Integer> errorAggregator;
public BigTableWriter(CloudBigtableOptions options) {
errorAggregator = createAggregator("errors",new Sum.SumIntegerFn());
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
errorAggregator.addValue(1);
}
}
}
We'd like to make this more granular rather than keeping a single aggregator to collect the errors. The documentation says that aggregator is typically created in the constructor. Is it possible to create an aggregator for each exception type inside of our catch block? For instance, we want to do something like.
public class BigTableWriter extends DoFn<String, Void> {
private Map<String, Aggregator<Integer, Integer> aggregatorMap;
public BigTableWriter(CloudBigtableOptions options) {
aggregatorMap = new HashMap<>();
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
aggregateException(ex.getCause().getMessage());
}
}
public void aggregateException(String exceptionMessage) {
Aggregator<Integer, Integer> aggregator = null;
if(!aggregatorMap.containsKey(exceptionMessage){
aggregator = createAggregator(exceptionMessage,new Sum.SumIntegerFn());
}
else {
aggregator = aggregatorMap.get(exceptionMessage);
}
aggregator.put(exceptionMessage, aggregator);
}
}
Upvotes: 2
Views: 94
Reputation: 4031
Unfortunately, no. The current logic for initializing aggregators requires that they are known at graph construction time (aka. created during the DoFn construction). It's a good feature request though. Created an issue to track it here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/55
Upvotes: 2