Reputation: 6105
I have a Kafka Consumer (built in Scala) which extracts latest records from Kafka. The consumer looks like this:
val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")
val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Now, I want to write an integration test for it. Is there any way or any best practice for Testing Kafka Consumers?
Upvotes: 1
Views: 14304
Reputation: 31262
You need to start zookeeper and kafka programmatically for integration tests.
1.1 start zookeeper (ZooKeeperServer
)
def startZooKeeper(zooKeeperPort: Int, zkLogsDir: Directory): ServerCnxnFactory = {
val tickTime = 2000
val zkServer = new ZooKeeperServer(zkLogsDir.toFile.jfile, zkLogsDir.toFile.jfile, tickTime)
val factory = ServerCnxnFactory.createFactory
factory.configure(new InetSocketAddress("0.0.0.0", zooKeeperPort), 1024)
factory.startup(zkServer)
factory
}
1.2 start kafka (KafkaServer
)
case class StreamConfig(streamTcpPort: Int = 9092,
streamStateTcpPort :Int = 2181,
stream: String,
numOfPartition: Int = 1,
nodes: Map[String, String] = Map.empty)
def startKafkaBroker(config: StreamConfig,
kafkaLogDir: Directory): KafkaServer = {
val syncServiceAddress = s"localhost:${config.streamStateTcpPort}"
val properties: Properties = new Properties
properties.setProperty("zookeeper.connect", syncServiceAddress)
properties.setProperty("broker.id", "0")
properties.setProperty("host.name", "localhost")
properties.setProperty("advertised.host.name", "localhost")
properties.setProperty("port", config.streamTcpPort.toString)
properties.setProperty("auto.create.topics.enable", "true")
properties.setProperty("log.dir", kafkaLogDir.toAbsolute.path)
properties.setProperty("log.flush.interval.messages", 1.toString)
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577")
config.nodes.foreach {
case (key, value) => properties.setProperty(key, value)
}
val broker = new KafkaServer(new KafkaConfig(properties))
broker.startup()
println(s"KafkaStream Broker started at ${properties.get("host.name")}:${properties.get("port")} at ${kafkaLogDir.toFile}")
broker
}
emit some events to stream using KafkaProducer
Then consume with your consumer to test and verify its working
You can use scalatest-eventstream that has startBroker
method which will start Zookeeper and Kafka for you.
Also has destroyBroker
which will cleanup your kafka after tests.
eg.
class MyStreamConsumerSpecs extends FunSpec with BeforeAndAfterAll with Matchers {
implicit val config =
StreamConfig(streamTcpPort = 9092, streamStateTcpPort = 2181, stream = "test-topic", numOfPartition = 1)
val kafkaStream = new KafkaEmbeddedStream
override protected def beforeAll(): Unit = {
kafkaStream.startBroker
}
override protected def afterAll(): Unit = {
kafkaStream.destroyBroker
}
describe("Kafka Embedded stream") {
it("does consume some events") {
//uses application.properties
//emitter.broker.endpoint=localhost:9092
//emitter.event.key.serializer=org.apache.kafka.common.serialization.StringSerializer
//emitter.event.value.serializer=org.apache.kafka.common.serialization.StringSerializer
kafkaStream.appendEvent("test-topic", """{"MyEvent" : { "myKey" : "myValue"}}""")
val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "earliest")
val myConsumer = new KafkaConsumer[String, String](consumerProperties)
myConsumer.subscribe(java.util.Collections.singletonList("test-topic"))
val events = myConsumer.poll(2000)
events.count() shouldBe 1
events.iterator().next().value() shouldBe """{"MyEvent" : { "myKey" : "myValue"}}"""
println("=================" + events.count())
}
}
}
Upvotes: 6