caarlos0
caarlos0

Reputation: 20653

Apache Flink: Metrics with key as label

I'm using PrometheusReporter on Flink, and have some counters I wanted to export with the key of keyed stream.

I don't have the key on open, so I'm trying to do it processElement instead, something like:

// job
myStream.keyBy(Foo::getKey)
        .process(new FooOperator())
        .name("foo");

// operator
public class FooOperator extends KeyedProcessFunction<String, Foo, Bar> {

  private transient Counter counter;

  @Override
  public void processElement(Foo value, Context ctx, Collector<Bar> out) throws Exception {
    if (counter == null) {
      counter = getRuntimeContext().getMetricGroup().addGroup("key", value.getKey()).counter("my_counter"):
    }
    counter.inc()
    out.collect(new Bar(foo));
  }
}

But I still get:

Name collision: Group already contains a Metric with the name 'my_counter'. Metric will not be reported.

Anything I could do? Maybe there is a better way I'm failing to find?

My idea would be to get whatever counter already exists there, but I can't find a way to do it that is not reflection...

Upvotes: 0

Views: 1838

Answers (1)

David Anderson
David Anderson

Reputation: 43707

A given instance of a KeyedProcessFunction is multiplexed across many different keys. The open method is called just once, when no specific key is in context, so you can't create a per-key metric there.

I believe there are two problems with what you've tried so far. First, you are going to need to create a separate Counter for every distinct key -- not just one per KeyedProcessFunction instance. Second, I'm not sure that addGroup("key", value.getKey()) is going to work here. If you still have problems after fixing the first issue, then maybe try addGroup("key").addGroup(value.getKey()) instead.

And finally, if your keyspace is large, and especially if it is unbounded, this whole approach is questionable. Flink's metric system isn't designed to scale up to huge numbers of metrics.

Upvotes: 1

Related Questions