mlg
mlg

Reputation: 1511

Kafka Consumer reads up to when it started, then hangs forever

I've set up a Kafka solution with 1 producer and 1 consumer and validated that all the connections are correct (I can produce messages and consume them). ZK Server & Kakfa Server are started and stable.

As described, my issue is that the Consumer will read messages just fine and from where it left off, but will only read messages that were created until before it started reading. After that, new messages will not be read until I kill the Consumer and restart him again.

Relevant Consumer Scala code

  val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
  val filterSpec = new Whitelist("some-valid-topic")

  val stream: KafkaStream[String, String] =
    consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

  log.info(s"Consumer started. Listening to topics [$filterSpec].")

  def read() = stream map digest

Where digest takes a MessageAndMetadata and has fun with it

def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
    log.info(s"processing the message [$messageAndMeta]")

Properties are

properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)

A timeline of what I can reproduce with this

Any thoughts? Thanks.

Upvotes: 3

Views: 3540

Answers (2)

mlg
mlg

Reputation: 1511

The problem was that I overlooked a ConsumerTimeoutException that was crashing my Consumer and I mistook this for "the Consumer hanging forever".

From the docs on Consumer configuration:

By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.

I had this set to a number of seconds, after which it would throw. By setting this to -1, I get the desired behaviour, although the ideal solution (for my use case) would be to implement something along the lines of this project: https://github.com/kciesielski/reactive-kafka

This thread pointed me in the right direction

Hope it helps someone else.

Upvotes: 2

ppearcy
ppearcy

Reputation: 2762

I suspect that your problem is calling map in order to consume from the stream.

Try using the iterator directly via stream.iterator.hasNext and stream.iterator.next and see if that makes a difference. There is an example here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Also, you will expect to receive ConsumerTimeoutException if no data is available for 2 seconds, so make sure your code is prepared to handle that.

Upvotes: 1

Related Questions