Chris Rollins
Chris Rollins

Reputation: 21

Create aggregators dynamically for exceptions

We are using aggregators to keep a count for exceptions during processing

public class BigTableWriter extends DoFn<String, Void> {

    private Aggregator<Integer, Integer> errorAggregator;
    public BigTableWriter(CloudBigtableOptions options) {
       errorAggregator = createAggregator("errors",new Sum.SumIntegerFn());
    }

    @Override
    public void processElement(DoFn<String, Void>.ProcessContext c){
        try {
          ....do work here
        }
        catch(Exception ex){
           errorAggregator.addValue(1);
        }

    }

}

We'd like to make this more granular rather than keeping a single aggregator to collect the errors. The documentation says that aggregator is typically created in the constructor. Is it possible to create an aggregator for each exception type inside of our catch block? For instance, we want to do something like.

public class BigTableWriter extends DoFn<String, Void> {

    private Map<String, Aggregator<Integer, Integer> aggregatorMap;
    public BigTableWriter(CloudBigtableOptions options) {         
       aggregatorMap = new HashMap<>();
    }

    @Override
    public void processElement(DoFn<String, Void>.ProcessContext c){
        try {
          ....do work here
        }
        catch(Exception ex){
          aggregateException(ex.getCause().getMessage());
        }
    }

    public void aggregateException(String exceptionMessage) {
       Aggregator<Integer, Integer> aggregator = null;
       if(!aggregatorMap.containsKey(exceptionMessage){
          aggregator = createAggregator(exceptionMessage,new Sum.SumIntegerFn());
       }
       else {
          aggregator = aggregatorMap.get(exceptionMessage);
       }

         aggregator.put(exceptionMessage, aggregator);
    }

}

Upvotes: 2

Views: 94

Answers (1)

Frances
Frances

Reputation: 4031

Unfortunately, no. The current logic for initializing aggregators requires that they are known at graph construction time (aka. created during the DoFn construction). It's a good feature request though. Created an issue to track it here: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/55

Upvotes: 2

Related Questions