himanshuIIITian
himanshuIIITian

Reputation: 6105

How to Test Kafka Consumer

I have a Kafka Consumer (built in Scala) which extracts latest records from Kafka. The consumer looks like this:

val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")

val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))

Now, I want to write an integration test for it. Is there any way or any best practice for Testing Kafka Consumers?

Upvotes: 1

Views: 14304

Answers (1)

prayagupadhyay
prayagupadhyay

Reputation: 31262

  1. You need to start zookeeper and kafka programmatically for integration tests.

    1.1 start zookeeper (ZooKeeperServer)

    def startZooKeeper(zooKeeperPort: Int, zkLogsDir: Directory): ServerCnxnFactory = {
        val tickTime = 2000
    
        val zkServer = new ZooKeeperServer(zkLogsDir.toFile.jfile, zkLogsDir.toFile.jfile, tickTime)
    
        val factory = ServerCnxnFactory.createFactory
        factory.configure(new InetSocketAddress("0.0.0.0", zooKeeperPort), 1024)
        factory.startup(zkServer)
    
        factory
    }
    

    1.2 start kafka (KafkaServer)

    case class StreamConfig(streamTcpPort: Int = 9092,
                        streamStateTcpPort :Int = 2181,
                        stream: String,
                        numOfPartition: Int = 1,
                        nodes: Map[String, String] = Map.empty)
    
    def startKafkaBroker(config: StreamConfig,
                       kafkaLogDir: Directory): KafkaServer = {
    
      val syncServiceAddress = s"localhost:${config.streamStateTcpPort}"
    
      val properties: Properties = new Properties
      properties.setProperty("zookeeper.connect", syncServiceAddress)
      properties.setProperty("broker.id", "0")
      properties.setProperty("host.name", "localhost")
      properties.setProperty("advertised.host.name", "localhost")
      properties.setProperty("port", config.streamTcpPort.toString)
      properties.setProperty("auto.create.topics.enable", "true")
      properties.setProperty("log.dir", kafkaLogDir.toAbsolute.path)
      properties.setProperty("log.flush.interval.messages", 1.toString)
      properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577")
    
      config.nodes.foreach {
        case (key, value) => properties.setProperty(key, value)
      }
    
      val broker = new KafkaServer(new KafkaConfig(properties))
      broker.startup()
    
      println(s"KafkaStream Broker started at ${properties.get("host.name")}:${properties.get("port")} at ${kafkaLogDir.toFile}")
      broker
    

    }

  2. emit some events to stream using KafkaProducer

  3. Then consume with your consumer to test and verify its working

You can use scalatest-eventstream that has startBroker method which will start Zookeeper and Kafka for you.

Also has destroyBroker which will cleanup your kafka after tests.

eg.

class MyStreamConsumerSpecs extends FunSpec with BeforeAndAfterAll with Matchers {
  implicit val config =
    StreamConfig(streamTcpPort = 9092, streamStateTcpPort = 2181, stream = "test-topic", numOfPartition = 1)

  val kafkaStream = new KafkaEmbeddedStream

  override protected def beforeAll(): Unit = {
    kafkaStream.startBroker
  }

  override protected def afterAll(): Unit = {
    kafkaStream.destroyBroker
  }

  describe("Kafka Embedded stream") {
    it("does consume some events") {

      //uses application.properties
      //emitter.broker.endpoint=localhost:9092
      //emitter.event.key.serializer=org.apache.kafka.common.serialization.StringSerializer
      //emitter.event.value.serializer=org.apache.kafka.common.serialization.StringSerializer
      kafkaStream.appendEvent("test-topic", """{"MyEvent" : { "myKey" : "myValue"}}""")

      val consumerProperties = new Properties()
      consumerProperties.put("bootstrap.servers", "localhost:9092")
      consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      consumerProperties.put("group.id", "something")
      consumerProperties.put("auto.offset.reset", "earliest")

      val myConsumer = new KafkaConsumer[String, String](consumerProperties)
      myConsumer.subscribe(java.util.Collections.singletonList("test-topic"))

      val events = myConsumer.poll(2000)

      events.count() shouldBe 1
      events.iterator().next().value() shouldBe """{"MyEvent" : { "myKey" : "myValue"}}"""
      println("=================" + events.count())
    }
  }
}

Upvotes: 6

Related Questions