Emil L
Emil L

Reputation: 21071

Kafka test failing/succeding intermittently

I am trying to write a simple test for an abstraction of the kafka scala client in kafka 0.8.2. It basically just writes a message to kafka and I then try to read it back. However, I had problems with it failing intermittantly so I boiled the test code down to the code below. This test sometimes (rarely) passes and sometimes fails. What am I doing worng?

package mykafkatest

import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}

import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {

  import scala.concurrent.ExecutionContext.Implicits.global
  val zkServer = new TestingServer()

  val socket = new ServerSocket(0)
  val port = socket.getLocalPort.toString
  socket.close()
  val tmpDir = Files.createTempDirectory("kafka-test-logs")

  val serverProps = new Properties
  serverProps.put("broker.id", port)
  serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
  serverProps.put("host.name", "localhost")
  serverProps.put("zookeeper.connect", zkServer.getConnectString)
  serverProps.put("port", port)

  val config = new KafkaConfig(serverProps)
  val kafkaServer = new KafkaServerStartable(config)

  override def beforeAll ={
    kafkaServer.startup()
  }

  override def afterAll = {
    kafkaServer.shutdown()
  }

  it("should put messages on a kafka queue") {
    println("zkServer: " + zkServer.getConnectString)
    println("broker port: " + port)

    val consumerProps = new Properties()
    consumerProps.put("group.id", UUID.randomUUID().toString)
    consumerProps.put("zookeeper.connect", zkServer.getConnectString)

    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
    val topic = "some-topic"
    val filterSpec = new Whitelist(topic)
    val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

    val producerProps = new Properties()
    producerProps.put("metadata.broker.list","localhost:"+port)

    val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
    val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
    sender.send(keyedMessage)

    val msg = Await.result(Future { stream.take(1) }, 5 seconds)
    msg.headOption should not be(empty)

  }
}

EDIT: I have created a new project with the following build.sbt and the above code as a test class.

name := "mykafkatest"

version := "1.0"

scalaVersion := "2.11.5"


libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "0.8.2.0",

  "org.scalatest" %% "scalatest" % "2.2.2" % "test",
  "org.apache.curator" % "curator-test" % "2.7.0" % "test"
)

And the test seem to pass more often, but it still fails intermittently...

Upvotes: 0

Views: 1843

Answers (2)

C4stor
C4stor

Reputation: 8026

You may have a race condition leading to the consumer actually finishing its initialization after the message is sent, and then ignoring the message since it start at largest offset by default.

Try adding

consumerProps.put("auto.offset.reset", "smallest")

to your consumer properties

Upvotes: 4

Noah
Noah

Reputation: 13959

I think this is some sort of message buffering issue. If you send 200 messages this works (for me):

(1 to 200).foreach(i => sender.send(keyedMessage))

199 messages fails. I tried changing configs around but couldn't find any magic to make 1 message work, though I'm sure there's some set of configs that could make this work.

Upvotes: 0

Related Questions