Reputation: 41
I am trying to setup a Kafka Mirror mechanism, but it seems the Kafka MirrorMaker's consumer from the source Kafka cluster only reads from new incoming data to the topics as soon as the mirror maker process is started, i.e. it does not read historically saved data in the topics previously.
I am using Kafka MirrorMaker class for that as:
/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.config --num.streams 2 --producer.config producer.config --whitelist=".*"
consumer.config
to read from Kafka source cluster, as:
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=kafka-mirror
and producer.config
settings to produce to the new Kafka mirrored cluster:
metadata.broker.list=localhost:9093
producer.type=sync
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
Is there a way to define the consumer of Kafka MirrorMaker to read from the beginning of the topics of my source Kafka cluster? A bit strange, because I have defined in the consumer.config
settings a new consumer group (kafka-mirror
), so the consumer should just read from offset 0
, i.e. from beginning of topics.
Many thanks in advance!
Upvotes: 4
Views: 6384
Reputation: 182
Very late answer but this might be helpful to some one who is still looking for.
As of now kafka mirror doesn't support this. There is an open defect .KafkaMirror
Upvotes: 0
Reputation: 121
In consumer properties, add
auto.offset.reset=earliest
This should work
Upvotes: 9
Reputation: 3182
Look at the auto.offset.reset
parameter from Kafka consumer configuration.
From Kafka documentation:
auto.offset.reset largest
What to do when there is no initial offset in Zookeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest
So, using smallest
for auto.offset.reset
should fix your problem.
Upvotes: 6