Meeran Mohideen
Meeran Mohideen

Reputation: 167

Kafka consumer in java not consuming messages

I am trying to a kafka consumer to get messages which are produced and posted to a topic in Java. My consumer goes as follows.

consumer.java

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;



public class KafkaConsumer extends  Thread {
    final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = " AATest";
    ConsumerConnector consumerConnector;


    public static void main(String[] argv) throws UnsupportedEncodingException {
        KafkaConsumer KafkaConsumer = new KafkaConsumer();
        KafkaConsumer.start();
    }

    public KafkaConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","10.200.208.59:2181");
        properties.put("group.id","test-group");      
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println(stream);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println("from it");
            System.out.println(new String(it.next().message()));

    }

    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
        for(MessageAndOffset messageAndOffset: messageSet) {
            ByteBuffer payload = messageAndOffset.message().payload();
            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);
            System.out.println(new String(bytes, "UTF-8"));
        }
    }
}

When I run the above code I am getting nothing in the console wheres the java producer program behind the screen is posting data continously under the 'AATest' topic. Also the in the zookeeper console I am getting the following lines when I try running the above consumer.java

[2015-04-30 15:57:31,284] INFO Accepted socket connection from /10.200.208.59:51780 (org.apache.zookeeper.
server.NIOServerCnxnFactory)
[2015-04-30 15:57:31,284] INFO Client attempting to establish new session at /10.200.208.59:51780 (org.apa
che.zookeeper.server.ZooKeeperServer)
[2015-04-30 15:57:31,315] INFO Established session 0x14d09cebce30007 with negotiated timeout 6000 for clie
nt /10.200.208.59:51780 (org.apache.zookeeper.server.ZooKeeperServer)

Also when I run a separate console-consumer pointing to the AATest topic, I am getting all the data produced by the producer to that topic.

Both consumer and broker are in the same machine whereas the producer is in different machine. This actually resembles this question. But going through it dint help me. Please help me.

Upvotes: 2

Views: 31605

Answers (3)

Anirudh Simha
Anirudh Simha

Reputation: 360

There might also be a case where kafka takes a long time to rebalance consumer groups when a new consumer is added to the same group id. Check kafka logs to see if the group is rebalanced after starting your consumer

Upvotes: 0

Zana Simsek
Zana Simsek

Reputation: 99

In our case, we solved our problem with the following steps:

The first thing we found is that there is an config called 'retry' for KafkaProducer and its default value means 'No Retry'. Also, send method of the KafkaProducer is async without calling the get method of the send method's result. In this way, there is no guarantee to delivery produced messages to the corresponding broker without retry. So, you have to increase it a bit or can use idempotence or transactional mode of KafkaProducer.

The second case is about the Kafka and Zookeeper version. We chose the 1.0.0 version of the Kafka and Zookeeper 3.4.4. Especially, Kafka 1.0.0 had an issue about the connectivity with Zookeeper. If Kafka loose its connection to the Zookeeper with an unexpected exception, it looses the leadership of the partitions which didn't synced yet. There is an bug topic about this issue : https://issues.apache.org/jira/browse/KAFKA-2729 After we found the corresponding logs at Kafka log which indicates same issue at topic above, we upgraded our Kafka broker version to the 1.1.0.

It is also important point to notice that small sized the partitions (like 100 or less), increases the throughput of the producer so if there is no enough consumer then the available consumer fall into the thread stuck on results with delayed messages(we measured delay with minutes, approximately 10-15 minutes). So you need to balance and configure the partition size and thread counts of your application correctly according to your available resources.

Upvotes: 1

prayagupadhyay
prayagupadhyay

Reputation: 31192

Different answer but it happened to be initial offset (auto.offset.reset) for a consumer in my case. So, setting up auto.offset.reset=earliest fixed the problem in my scenario. Its because I was publishing event first and then starting a consumer.

By default, consumer only consumes events published after it started because auto.offset.reset=latest by default.

eg. consumer.properties

bootstrap.servers=localhost:9092
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Test

class KafkaEventConsumerSpecs extends FunSuite {

  case class TestEvent(eventOffset: Long, hashValue: Long, created: Date, testField: String) extends BaseEvent

  test("given an event in the event-store, consumes an event") {

    EmbeddedKafka.start()

    //PRODUCE
    val event = TestEvent(0l, 0l, new Date(), "data")
    val config = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/producer.properties"))
      }
    }
    val producer = new KafkaProducer[String, String](config)

    val persistedEvent = producer.send(new ProducerRecord(event.getClass.getSimpleName, event.toString))

    assert(persistedEvent.get().offset() == 0)
    assert(persistedEvent.get().checksum() != 0)

    //CONSUME
    val consumerConfig = new Properties() {
      {
        load(this.getClass.getResourceAsStream("/consumer.properties"))
        put("group.id", "consumers_testEventsGroup")
        put("client.id", "testEventConsumer")
      }
    }

    assert(consumerConfig.getProperty("group.id") == "consumers_testEventsGroup")

    val kafkaConsumer = new KafkaConsumer[String, String](consumerConfig)

    assert(kafkaConsumer.listTopics().asScala.map(_._1).toList == List("TestEvent"))

    kafkaConsumer.subscribe(Collections.singletonList("TestEvent"))

    val events = kafkaConsumer.poll(1000)
    assert(events.count() == 1)

    EmbeddedKafka.stop()
  }
}

But if consumer is started first and then published, the consumer should be able to consume the event without auto.offset.reset required to be set to earliest.

References for kafka 0.10

https://kafka.apache.org/documentation/#consumerconfigs

Upvotes: 7

Related Questions