Reputation: 20653
I'm using PrometheusReporter
on Flink, and have some counters I wanted to export with the key of keyed stream.
I don't have the key on open
, so I'm trying to do it processElement
instead, something like:
// job
myStream.keyBy(Foo::getKey)
.process(new FooOperator())
.name("foo");
// operator
public class FooOperator extends KeyedProcessFunction<String, Foo, Bar> {
private transient Counter counter;
@Override
public void processElement(Foo value, Context ctx, Collector<Bar> out) throws Exception {
if (counter == null) {
counter = getRuntimeContext().getMetricGroup().addGroup("key", value.getKey()).counter("my_counter"):
}
counter.inc()
out.collect(new Bar(foo));
}
}
But I still get:
Name collision: Group already contains a Metric with the name 'my_counter'. Metric will not be reported.
Anything I could do? Maybe there is a better way I'm failing to find?
My idea would be to get whatever counter already exists there, but I can't find a way to do it that is not reflection...
Upvotes: 0
Views: 1838
Reputation: 43707
A given instance of a KeyedProcessFunction
is multiplexed across many different keys. The open
method is called just once, when no specific key is in context, so you can't create a per-key metric there.
I believe there are two problems with what you've tried so far. First, you are going to need to create a separate Counter
for every distinct key -- not just one per KeyedProcessFunction
instance. Second, I'm not sure that addGroup("key", value.getKey())
is going to work here. If you still have problems after fixing the first issue, then maybe try addGroup("key").addGroup(value.getKey())
instead.
And finally, if your keyspace is large, and especially if it is unbounded, this whole approach is questionable. Flink's metric system isn't designed to scale up to huge numbers of metrics.
Upvotes: 1