Reputation: 305
My team uses a lot of aggregators (custom counters) for many of dataflow pipelines we use for monitoring and analysis purposes.
We mostly write DoFn
classes to do so, but we sometimes use Combine.perKey()
, by writing our own combine class that implements SerializableFunction<Iterable<T>, S>
(usually in our case, T
and S
are the same). Some of the jobs we run have a small fraction of very hot keys, and we are looking to utilize some of the features offered by Combine
(such as hot key fanout), but there is one issue with this approach.
It appears that aggregators are only available within DoFn
, and I am wondering if there is a way around this, or this is a likely feature to be added in the future. Mostly, we use a bunch of custom counters to count the number of certain events/objects of different types for analysis and monitoring purposes. In some cases, we can probably apply another DoFn after the Combine step to do this, but in other cases we really need to count things during the combine process -- for instance, we want to know the distribution of objects over keys to understand how many hot keys we have and what draws the line between hot keys and very hot keys, for instance. There are a few other cases that seem tricky to us.
I searched around, but I couldn't find much resource around how one can use aggregators during the Combine
step, so any help will be really appreciated!
If needed, I can perhaps describe what kind of Combine
step we use and what we are trying to count, but it'll take some time and I'd like to have a general solution around this.
Upvotes: 0
Views: 346
Reputation: 6130
This is not currently possible. In the future (as part of Apache Beam) it is likely to be possible to define metrics (which are like aggregators) within a CombineFn
which should address this.
In the meantime, for your use case you can do as you describe. You can have a Combine.perKey()
, and then have multiple steps consuming the result -- one for your actual processing and others to report various metrics.
You could also look at the methods in CombineFns which allow creating a composed CombineFn
. For instance, you could use your CombineFn
and a simple Count
, so that the reporting DoFn
can report the number of elements in each key (consuming the Count
) and the actual processing DoFn
can consume the result of your CombineFn
.
Upvotes: 1