Reputation: 21071
I am trying to write a simple test for an abstraction of the kafka scala client in kafka 0.8.2. It basically just writes a message to kafka and I then try to read it back. However, I had problems with it failing intermittantly so I boiled the test code down to the code below. This test sometimes (rarely) passes and sometimes fails. What am I doing worng?
package mykafkatest
import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}
import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {
import scala.concurrent.ExecutionContext.Implicits.global
val zkServer = new TestingServer()
val socket = new ServerSocket(0)
val port = socket.getLocalPort.toString
socket.close()
val tmpDir = Files.createTempDirectory("kafka-test-logs")
val serverProps = new Properties
serverProps.put("broker.id", port)
serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
serverProps.put("host.name", "localhost")
serverProps.put("zookeeper.connect", zkServer.getConnectString)
serverProps.put("port", port)
val config = new KafkaConfig(serverProps)
val kafkaServer = new KafkaServerStartable(config)
override def beforeAll ={
kafkaServer.startup()
}
override def afterAll = {
kafkaServer.shutdown()
}
it("should put messages on a kafka queue") {
println("zkServer: " + zkServer.getConnectString)
println("broker port: " + port)
val consumerProps = new Properties()
consumerProps.put("group.id", UUID.randomUUID().toString)
consumerProps.put("zookeeper.connect", zkServer.getConnectString)
val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
val topic = "some-topic"
val filterSpec = new Whitelist(topic)
val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
val producerProps = new Properties()
producerProps.put("metadata.broker.list","localhost:"+port)
val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
sender.send(keyedMessage)
val msg = Await.result(Future { stream.take(1) }, 5 seconds)
msg.headOption should not be(empty)
}
}
EDIT: I have created a new project with the following build.sbt and the above code as a test class.
name := "mykafkatest"
version := "1.0"
scalaVersion := "2.11.5"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "0.8.2.0",
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"org.apache.curator" % "curator-test" % "2.7.0" % "test"
)
And the test seem to pass more often, but it still fails intermittently...
Upvotes: 0
Views: 1843
Reputation: 8026
You may have a race condition leading to the consumer actually finishing its initialization after the message is sent, and then ignoring the message since it start at largest offset by default.
Try adding
consumerProps.put("auto.offset.reset", "smallest")
to your consumer properties
Upvotes: 4
Reputation: 13959
I think this is some sort of message buffering issue. If you send 200 messages this works (for me):
(1 to 200).foreach(i => sender.send(keyedMessage))
199 messages fails. I tried changing configs around but couldn't find any magic to make 1 message work, though I'm sure there's some set of configs that could make this work.
Upvotes: 0