Reputation: 80176
I am expecting the consume test to read just one message and shutdown. However, it isn't, even after I am calling the consumer.shutdown()
. Wondering why?
Test
public class AppTest
{
App app=new App();
@org.junit.Test
public void publish()
{
assertTrue(app.publish("temptopic","message"));
}
@org.junit.Test
public void consume()
{
app.publish("temptopic","message");
assertEquals("message",app.consume("temptopic","tempgroup"));
}
}
Class Under Test
public class App
{
public boolean publish(String topicName, String message) {
Properties p=new Properties();
p.put("bootstrap.servers","localhost:9092");
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("request.required.acks","1");
KafkaProducer producer = new KafkaProducer<String,String>(p);
ProducerRecord record=new ProducerRecord(topicName,"mykey",message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata=null;
try {
recordMetadata = future.get();
} catch (Exception e) {
return false;
}
return true;
}
public String consume(String topicName, String groupId)
{
Properties p=new Properties();
p.put("zookeeper.connect","localhost:2181");
p.put("group.id","groupId");
p.put("zookeeper.session.timeout.ms","500");
p.put("zookeeper.sync.time.ms","250");
p.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer=null;
String result = "";
try
{
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));
Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata msg = it.next();
String strMsg = new String((byte[]) msg.message());
System.out.println("received: " + strMsg);
result += strMsg;
}
}
}
finally
{
if (consumer != null)
{
consumer.shutdown();
}
}
return result;
}
}
Upvotes: 1
Views: 3199
Reputation: 6562
The ConsumerIterator will by default block indefinitely and just wait for the next message. To get it out of the while loop you would need to interrupt the thread.
There is a consumer property "consumer.timeout.ms" which can be set to a value after which a timeout exception will be thrown to the consumer but maybe this is not the most practical solution for unit testing.
Upvotes: 4