Rahul Gupta
Rahul Gupta

Reputation: 3

Kafka Producer Java API is not distributing messages to all topic partitions

I am very new to Kafka and today I tried creating Java Producer for producing messages on Kafka topics on different Partitions.

First I created a package raggieKafka under which I created 2 classes: TestProducer and SimplePartitioner.

TestProducer class has following code:

package raggieKafka;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer{

    public static void main(String args[]) throws Exception
    {
        long events = 0;

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        events = Integer.parseInt(reader.readLine());
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("topic.metadata.refresh.interval.ms", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "raggieKafka.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> prod = new Producer<String, String>(config);

        for(long i = 0; i < events; i++)
        {
            long runtime = new Date().getTime();
            String ip = "192.168.2." + rnd.nextInt(255);
            String msg = runtime + ",www.example.com, " + ip;
            KeyedMessage<String,String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            prod.send(data);
        }
        prod.close();
    }
}

and SimplePartitioner class has following code:

package raggieKafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner{

    public SimplePartitioner(VerifiableProperties props)
    {

    }

    public int partition(Object Key, int a_numPartitions)
    {
        int partition = 0;
        String stringKey = (String) Key;
        int offset = stringKey.indexOf(stringKey);

        if(offset > 0)
        {
            partition = Integer.parseInt(stringKey.substring(offset+1)) % a_numPartitions;
        }
        return partition;
    }   
}

Before compiling these Java program I created topic on Kafka Broker:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --create --topic page_visit
s --zookeeper localhost:2181 --partitions 5 --replication-factor 1
WARNING: Due to limitations in metric names, topics with a period ('.') or under
score ('_') could collide. To avoid issues it is best to use either, but not bot
h.
Created topic "page_visits".

Now when I compile the java program it puts all the messages to only 1 partition i.e. page_visits-0 under which all the messages are posted however rest all other partitions remain empty.

Can someone tell me why my Java producer is NOT distributing all my messaged to other partitions ?

Infact, I looked on google and then added one more property:

props.put("topic.metadata.refresh.interval.ms", "1");

but still Producer isn't producing messages to all the topics.

PLEASE HELP.

Upvotes: 0

Views: 945

Answers (1)

avr
avr

Reputation: 4893

Your SimplePartitioner code has bug in the following line

int offset = stringKey.indexOf(stringKey);

It always returns 0 so your offset always equals to 0 and as it never greater than 0 your if block will not get executed. And finally it always returns you partition 0.

Solution: As your key is ip address,the following change could work as expected.

int offset = stringKey.lastIndexOf('.');

Hope this helps!

Upvotes: 3

Related Questions