Samuel Chase
Samuel Chase

Reputation: 45

Unable to send application-level metrics from Flink 1.15.0 job

I'm in the process of upgrading a Flink 1.9.1 cluster to a Flink 1.15.0 cluster. This required me to update Flink APIs in the Flink Job code.

The updated job runs fine on Flink 1.15.0, except that it does not emit any application level metrics.

According to the documentation, metrics are registered as follows:

private Counter numEvents = getRuntimeContext().getMetricGroup().counter("foo.bar.numEvents");

// and later

numEvents.inc();

// etc. 

and then the flink-metrics-statsd plugin handles the part where it sends the metrics to a StatsD endpoint.

I have confirmed using tcpdump on the StatsD endpoint that system level metrics are getting sent, however the metrics I explicitly registered in the code are nowhere to be seen.

The same metric registering code on the old Flink 1.9.1 ran fine and emitted all these metrics. According to the 1.15.0 documentation, the code I have seems to be correct. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/metrics/

In the Flink logs, I can see the following error:

2022-07-19 16:50:07,975 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - Error while reporting metrics
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Double (java.lang.String and java.lang.Double are in module java.base of loader 'bootstrap')
        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) ~[?:?]
        at org.apache.flink.metrics.statsd.StatsDReporter.reportGauge(StatsDReporter.java:136) ~[?:?]
        at org.apache.flink.metrics.statsd.StatsDReporter.report(StatsDReporter.java:106) ~[?:?]
        at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:495) [flink-dist-1.15.0.jar:1.15.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

I'm not sure if that is related.

Would be great to hear from anyone who might know what's going on here. Thanks. 🙏‍

Upvotes: 0

Views: 515

Answers (1)

Martijn Visser
Martijn Visser

Reputation: 2078

You're either suffering from one or both of these issues:

  1. https://issues.apache.org/jira/browse/FLINK-27487 - Upgrading to the latest Flink 1.15 (currently Flink 1.15.1) should resolve that.
  2. https://issues.apache.org/jira/browse/FLINK-28488 in case you're using the old FlinkKafkaConsumer / FlinkKafkaProducer. That issue is not resolved (yet).

Upvotes: 1

Related Questions