Reputation: 45
I'm in the process of upgrading a Flink 1.9.1 cluster to a Flink 1.15.0 cluster. This required me to update Flink APIs in the Flink Job code.
The updated job runs fine on Flink 1.15.0, except that it does not emit any application level metrics.
According to the documentation, metrics are registered as follows:
private Counter numEvents = getRuntimeContext().getMetricGroup().counter("foo.bar.numEvents");
// and later
numEvents.inc();
// etc.
and then the flink-metrics-statsd
plugin handles the part where it sends the metrics to a StatsD endpoint.
I have confirmed using tcpdump on the StatsD endpoint that system level metrics are getting sent, however the metrics I explicitly registered in the code are nowhere to be seen.
The same metric registering code on the old Flink 1.9.1 ran fine and emitted all these metrics. According to the 1.15.0 documentation, the code I have seems to be correct. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/metrics/
In the Flink logs, I can see the following error:
2022-07-19 16:50:07,975 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl [] - Error while reporting metrics
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Double (java.lang.String and java.lang.Double are in module java.base of loader 'bootstrap')
at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) ~[?:?]
at org.apache.flink.metrics.statsd.StatsDReporter.reportGauge(StatsDReporter.java:136) ~[?:?]
at org.apache.flink.metrics.statsd.StatsDReporter.report(StatsDReporter.java:106) ~[?:?]
at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:495) [flink-dist-1.15.0.jar:1.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
I'm not sure if that is related.
Would be great to hear from anyone who might know what's going on here. Thanks. 🙏
Upvotes: 0
Views: 515
Reputation: 2078
You're either suffering from one or both of these issues:
Upvotes: 1