user2795708
user2795708

Reputation: 41

Kafka MirrorMaker's consumer not fetching all messages from topics

I am trying to setup a Kafka Mirror mechanism, but it seems the Kafka MirrorMaker's consumer from the source Kafka cluster only reads from new incoming data to the topics as soon as the mirror maker process is started, i.e. it does not read historically saved data in the topics previously.

I am using Kafka MirrorMaker class for that as:

/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.config --num.streams 2 --producer.config producer.config --whitelist=".*"

consumer.config to read from Kafka source cluster, as:

zookeeper.connect=127.0.0.1:2181

zookeeper.connection.timeout.ms=6000

group.id=kafka-mirror

and producer.config settings to produce to the new Kafka mirrored cluster:

metadata.broker.list=localhost:9093

producer.type=sync

compression.codec=none

serializer.class=kafka.serializer.DefaultEncoder

Is there a way to define the consumer of Kafka MirrorMaker to read from the beginning of the topics of my source Kafka cluster? A bit strange, because I have defined in the consumer.config settings a new consumer group (kafka-mirror), so the consumer should just read from offset 0, i.e. from beginning of topics.

Many thanks in advance!

Upvotes: 4

Views: 6384

Answers (3)

Guda uma shanker
Guda uma shanker

Reputation: 182

Very late answer but this might be helpful to some one who is still looking for.

As of now kafka mirror doesn't support this. There is an open defect .KafkaMirror

Upvotes: 0

Jobin Ar
Jobin Ar

Reputation: 121

In consumer properties, add

auto.offset.reset=earliest

This should work

Upvotes: 9

codejitsu
codejitsu

Reputation: 3182

Look at the auto.offset.reset parameter from Kafka consumer configuration.

From Kafka documentation:

auto.offset.reset largest

What to do when there is no initial offset in Zookeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

So, using smallest for auto.offset.reset should fix your problem.

Upvotes: 6

Related Questions