annedroiid
annedroiid

Reputation: 6667

Can't get KafkaProducer/KafkaConsumer to work in Scala

I'm trying to create a simple KafkaProducer and KafkaConsumer so I can send data to a topic on a broker, and then verify that the data was received. I have below the two methods I used to define my consumer and producer, and how I'm sending the message. The send method takes at lest 20 seconds to complete, and as far as I can tell the consumer.poll method never actually finishes, but the longest I've left it was 10 minutes.

Does anyone have a suggestion as to what I'm doing wrong? Is there some property for the producer/consumer that I'm not setting up correctly? Those properties are copied directly from the docs, so I don't understand why they won't work.

KafkaProducer docs
KafkaConsumer docs

"verify we can send to producer" in {
    val consumer = createKafkaConsumer("address:9002")
    val producer = createKafkaProducer("address:9002")

    val message = "I am a message"
    val record = new ProducerRecord[String, String]("myTopic", message)

    producer.send(record)
    TimeUnit.SECONDS.sleep(5)
    val records = consumer.poll(5000)
    println("records: "+records)
    consumer1.close()
}

def createKafkaProducer(kafka: String): KafkaProducer[String,String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("acks", "all")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    new KafkaProducer[String,String](props)
}

def createKafkaConsumer(kafka: String): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", kafka)
    props.put("group.id", "test")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("session.timeout.ms", "30000")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("myTopic"))
    consumer
}

Edit: I've updated my code so that I now get the response from the send method, and it seems that that times out with org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Upvotes: 1

Views: 789

Answers (1)

annedroiid
annedroiid

Reputation: 6667

Turns out I had a DNS issue that meant that I wasn't actually connecting to the broker. Fixing this allowed the messages to go through, there was nothing wrong with the config.

Upvotes: 1

Related Questions