Aravind Yarram
Aravind Yarram

Reputation: 80176

Why isn't this Kafka consumer shutting down?

I am expecting the consume test to read just one message and shutdown. However, it isn't, even after I am calling the consumer.shutdown(). Wondering why?

Test

public class AppTest
{
    App app=new App();

    @org.junit.Test
    public void publish()
    {

        assertTrue(app.publish("temptopic","message"));
    }

    @org.junit.Test
    public void consume()
    {
        app.publish("temptopic","message");
        assertEquals("message",app.consume("temptopic","tempgroup"));
    }
}

Class Under Test

public class App 
{
    public boolean publish(String topicName, String message) {

        Properties p=new Properties();
        p.put("bootstrap.servers","localhost:9092");
        p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("request.required.acks","1");

        KafkaProducer producer = new KafkaProducer<String,String>(p);

        ProducerRecord record=new ProducerRecord(topicName,"mykey",message);

        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata recordMetadata=null;
        try {
            recordMetadata = future.get();
        } catch (Exception e) {
            return false;
        }

        return true;
    }

    public String consume(String topicName, String groupId)
    {
        Properties p=new Properties();
        p.put("zookeeper.connect","localhost:2181");
        p.put("group.id","groupId");
        p.put("zookeeper.session.timeout.ms","500");
        p.put("zookeeper.sync.time.ms","250");
        p.put("auto.commit.interval.ms", "1000");

        ConsumerConnector consumer=null;
        String result = "";

        try
        {
            consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));

            Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
            topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read

            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);

            List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);


            for (KafkaStream stream : streams) {
                ConsumerIterator it = stream.iterator();
                while (it.hasNext()) {
                    MessageAndMetadata msg = it.next();
                    String strMsg = new String((byte[]) msg.message());
                    System.out.println("received: " + strMsg);
                    result += strMsg;
                }
            }
        }
        finally
        {
            if (consumer != null)
            {
                consumer.shutdown();
            }
        }
        return result;
    }
}

Upvotes: 1

Views: 3199

Answers (1)

Lundahl
Lundahl

Reputation: 6562

The ConsumerIterator will by default block indefinitely and just wait for the next message. To get it out of the while loop you would need to interrupt the thread.

There is a consumer property "consumer.timeout.ms" which can be set to a value after which a timeout exception will be thrown to the consumer but maybe this is not the most practical solution for unit testing.

Upvotes: 4

Related Questions