Reputation: 43
I have a system that pulls messages from a Kafka topic, and when it's unable to process messages because some external resource is unavailable, it shuts down the consumer, returns the message to the topic, and waits some time before starting the consumer again. The only problem is, shutting down doesn't work. Here's what I see in my logs:
2014-09-30 08:24:10,918 - com.example.kafka.KafkaConsumer [info] - [application-akka.actor.workflow-context-8] Shutting down kafka consumer for topic new-problem-reports 2014-09-30 08:24:10,927 - clients.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8] Consumer shutdown 2014-09-30 08:24:11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8] Sending 7410-1412090624000 back to the queue 2014-09-30 08:24:12,021 - clients.kafka.ProblemReportObserver [debug] - [kafka-akka.actor.kafka-consumer-worker-context-9] Message from partition 0: key=7410-1412090624000, msg=7410-1412090624000
There's a few layers at work here, but the important code is:
In KafkaConsumer.scala
:
protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig)
def shutdown() = {
logger.info(s"Shutting down kafka consumer for topic ${config.topic}")
consumer.shutdown()
}
In the routine that observes messages:
(processor ? ProblemReportRequest(problemReportKey)).map {
case e: ConnectivityInterruption =>
val backoff = 10.seconds
logger.warn(s"Can't connect to essential services, pausing for $backoff", e)
stop()
// XXX: Shutdown isn't instantaneous, so returning has to happen after a delay.
// Unfortunately, there's still a race condition here, plus there's a chance the
// system will be shut down before the message has been returned.
system.scheduler.scheduleOnce(100 millis) { returnMessage(message) }
system.scheduler.scheduleOnce(backoff) { start() }
false
case e: Exception => returnMessage(message, e)
case _ => true
}.recover { case e => returnMessage(message, e) }
And the stop method:
def stop() = {
if (consumerRunning.get()) {
consumer.shutdown()
consumerRunning.compareAndSet(true, false)
logger.info("Consumer shutdown")
} else {
logger.info("Consumer is already shutdown")
}
!consumerRunning.get()
}
Is this a bug, or am I doing it wrong?
Upvotes: 1
Views: 1388
Reputation: 1124
Because your consumer
is a def
. It creates a new Kafka instance and shut that new instance down when you call it like consumer.shutdown()
. Make consumer
a val
instead.
Upvotes: 1