Joe Rakhimov
Joe Rakhimov

Reputation: 5083

java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext

Now I am learning Kafka. According to the video course, I am trying to build Twitter tweets filter with Kafka Streams:

public class StreamsFilterTweet {

    public static void main(String[] args) {

        // create properties
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "demo-kafka-streams");
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

        // create a topology
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // input topic
        KStream<String,String> inputTopic = streamsBuilder.stream("twitter_tweets");
        KStream<String,String> filteredStream = inputTopic.filter(
                (k,jsonValue)->extractUserFollowersInTweets(jsonValue)>10000
                        // filter for tweets which has a user of over 10000 followers
        );
        filteredStream.to("important_tweets");

        // build topology
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        // start our streams application
        kafkaStreams.start();

    }

    private static JsonParser jsonParser = new JsonParser();
    private static int extractUserFollowersInTweets(String tweetJson) {
        try {
            return jsonParser.parse(tweetJson)
                    .getAsJsonObject()
                    .get("user")
                    .getAsJsonObject()
                    .get("followers_count")
                    .getAsInt();
        } catch (NullPointerException e){
            return 0;
        }
    }

} 

When I try to run this code, I am getting this error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/MetricsContext at tutorial4.StreamsFilterTweet.main(StreamsFilterTweet.java:35) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.metrics.MetricsContext at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) ... 1 more

According to the stacktrace, this line is causing the problem:

KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

I am using Maven and have the following dependencies:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.6.0</version>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

I could not find why this problem is happening. How can I solve this problem?

Upvotes: 2

Views: 7206

Answers (2)

Ismail
Ismail

Reputation: 2992

The issue is not with the version of kafka-streams. You have conflicts between dependencies kafka-streams and kafka-clients. So you need to upgrade kafka-clients to 2.6.0 version or downgrade kafka-streams to 2.0.0 version.

Kafka Streams depends on kafka-clients, so if you declare two of them as dependencies with different versions, you will face such errors.

Finally, as I see in your code, you can remove kafka-clients dependency from your POM.

Upvotes: 9

Joe Rakhimov
Joe Rakhimov

Reputation: 5083

I have found the solution to this problem: added more older version of Kafka Stream library:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.0.0</version>
</dependency>

Upvotes: 0

Related Questions