Reputation: 157
I am using Apache Flink version 1.13.1
I wrote a custom metrics reporter, but the JobManager does not seem to recognise it. On startup, the JobManager shows the following warning log:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available factories: [org.apache.flink.metrics.slf4j.Slf4jReporterFactory, org.apache.flink.metrics.datadog.DatadogHttpReporterFactory, org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory, org.apache.flink.metrics.graphite.GraphiteReporterFactory, org.apache.flink.metrics.statsd.StatsDReporterFactory, org.apache.flink.metrics.prometheus.PrometheusReporterFactory, org.apache.flink.metrics.jmx.JMXReporterFactory, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory].
2021-08-25 14:54:06,245 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - No metrics reporter configured, no metrics will be exposed/reported.
I have a folder within the Flink plugins folder called metrics-kafka
which contains the packaged jar for the metrics reporter. I have also copied this jar to the lib folder, both of which did not work. See the configuration and code used below.
Flink configuration file:
metrics.reporter.kafka.factory.class: org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka.class: org.apache.flink.metrics.kafka.KafkaReporter
metrics.reporter.kafka.interval: 15 SECONDS
Metrics reporter factory class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.reporter.{InterceptInstantiationViaReflection, MetricReporter, MetricReporterFactory}
import java.util.Properties
@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
class KafkaReporterFactory extends MetricReporterFactory{
override def createMetricReporter(properties: Properties): MetricReporter = {
new KafkaReporter()
}
}
Metrics reporter class:
package org.apache.flink.metrics.kafka
import org.apache.flink.metrics.MetricConfig
import org.apache.flink.metrics.reporter.{InstantiateViaFactory, Scheduled}
@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.kafka.KafkaReporterFactory")
class KafkaReporter extends MyAbstractReporter with Scheduled{
...
}
Upvotes: 3
Views: 1536
Reputation: 157
I found that I needed to add a file called org.apache.flink.metrics.reporter.MetricReporterFactory
with the contents org.apache.flink.metrics.kafka.KafkaReporterFactory
in /resources/META-INF/services/
.
Upvotes: 7