Reputation: 1511
I've set up a Kafka solution with 1 producer and 1 consumer and validated that all the connections are correct (I can produce messages and consume them). ZK Server & Kakfa Server are started and stable.
As described, my issue is that the Consumer will read messages just fine and from where it left off, but will only read messages that were created until before it started reading. After that, new messages will not be read until I kill the Consumer and restart him again.
Relevant Consumer Scala code
val consumer = Consumer.create(new ConsumerConfig(readConsumerPropertiesFromConfig))
val filterSpec = new Whitelist("some-valid-topic")
val stream: KafkaStream[String, String] =
consumer.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head
log.info(s"Consumer started. Listening to topics [$filterSpec].")
def read() = stream map digest
Where digest takes a MessageAndMetadata and has fun with it
def digest(messageAndMeta: MessageAndMetadata[String, String]) = {
log.info(s"processing the message [$messageAndMeta]")
Properties are
properties.put("group.id", "default_consumer_group")
properties.put("zookeeper.connect", "localhost:2181")
properties.put("auto.offset.reset", "smallest")
properties.put("consumer.timeout.ms", 2000)
A timeline of what I can reproduce with this
Any thoughts? Thanks.
Upvotes: 3
Views: 3540
Reputation: 1511
The problem was that I overlooked a ConsumerTimeoutException that was crashing my Consumer and I mistook this for "the Consumer hanging forever".
From the docs on Consumer configuration:
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption.
I had this set to a number of seconds, after which it would throw. By setting this to -1, I get the desired behaviour, although the ideal solution (for my use case) would be to implement something along the lines of this project: https://github.com/kciesielski/reactive-kafka
This thread pointed me in the right direction
Hope it helps someone else.
Upvotes: 2
Reputation: 2762
I suspect that your problem is calling map in order to consume from the stream.
Try using the iterator directly via stream.iterator.hasNext and stream.iterator.next and see if that makes a difference. There is an example here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Also, you will expect to receive ConsumerTimeoutException if no data is available for 2 seconds, so make sure your code is prepared to handle that.
Upvotes: 1