Haden Hooyeon Lee
Haden Hooyeon Lee

Reputation: 305

A way to use "aggregator" (custom counter) within Combine step?

My team uses a lot of aggregators (custom counters) for many of dataflow pipelines we use for monitoring and analysis purposes.

We mostly write DoFn classes to do so, but we sometimes use Combine.perKey(), by writing our own combine class that implements SerializableFunction<Iterable<T>, S> (usually in our case, T and S are the same). Some of the jobs we run have a small fraction of very hot keys, and we are looking to utilize some of the features offered by Combine (such as hot key fanout), but there is one issue with this approach.

It appears that aggregators are only available within DoFn, and I am wondering if there is a way around this, or this is a likely feature to be added in the future. Mostly, we use a bunch of custom counters to count the number of certain events/objects of different types for analysis and monitoring purposes. In some cases, we can probably apply another DoFn after the Combine step to do this, but in other cases we really need to count things during the combine process -- for instance, we want to know the distribution of objects over keys to understand how many hot keys we have and what draws the line between hot keys and very hot keys, for instance. There are a few other cases that seem tricky to us.

I searched around, but I couldn't find much resource around how one can use aggregators during the Combine step, so any help will be really appreciated!

If needed, I can perhaps describe what kind of Combine step we use and what we are trying to count, but it'll take some time and I'd like to have a general solution around this.

Upvotes: 0

Views: 346

Answers (1)

Ben Chambers
Ben Chambers

Reputation: 6130

This is not currently possible. In the future (as part of Apache Beam) it is likely to be possible to define metrics (which are like aggregators) within a CombineFn which should address this.

In the meantime, for your use case you can do as you describe. You can have a Combine.perKey(), and then have multiple steps consuming the result -- one for your actual processing and others to report various metrics.

You could also look at the methods in CombineFns which allow creating a composed CombineFn. For instance, you could use your CombineFn and a simple Count, so that the reporting DoFn can report the number of elements in each key (consuming the Count) and the actual processing DoFn can consume the result of your CombineFn.

Upvotes: 1

Related Questions