Jeremy Lewi
Jeremy Lewi

Reputation: 6776

How do I create user defined counters in Dataflow?

How can I create my own counters in my DoFns?

In my DoFn I'd like to increment a counter every time a condition is met when processing a record. I'd like this counter to sum the values across all records.

Upvotes: 2

Views: 1599

Answers (1)

Jeff Gardner
Jeff Gardner

Reputation: 36

You can use Aggregators, and the total values of the counters will show up in the UI.

Here is an example where I experimented with Aggregators in a pipeline that just sleeps numOutputShards workers for sleepSecs seconds. (The GenFakeInput PTransform at the beginning just returns a flattened PCollection<String> of size numOutputShards):

PCollection<String> output = p
    .apply(new GenFakeInput(options.getNumOutputShards()))
    .apply(ParDo.named("Sleep").of(new DoFn<String, String>() {
         private Aggregator<Long> tSleepSecs;
         private Aggregator<Integer> tWorkers;
         private Aggregator<Long> tExecTime;
         private long startTimeMillis;

         @Override
         public void startBundle(Context c) {
           tSleepSecs = c.createAggregator("Total Slept (sec)", new Sum.SumLongFn());
           tWorkers = c.createAggregator("Num Workers", new Sum.SumIntegerFn());
           tExecTime = c.createAggregator("Total Wallclock (sec)", new Sum.SumLongFn());
           startTimeMillis = System.currentTimeMillis();
         }

         @Override
         public void finishBundle(Context c) {
           tExecTime.addValue((System.currentTimeMillis() - startTimeMillis)/1000);
         }

         @Override
         public void processElement(ProcessContext c) {
           try {
             LOG.info("Sleeping for {} seconds.", sleepSecs);
             tSleepSecs.addValue(sleepSecs);
             tWorkers.addValue(1);
             TimeUnit.SECONDS.sleep(sleepSecs);
           } catch (InterruptedException e) {
             LOG.info("Ignoring caught InterruptedException during sleep.");
           }
           c.output(c.element());
         }}));

Upvotes: 2

Related Questions