Reputation: 9118
I have setup MirrorMaker2 for replicating data between 2 DCs.
My mm2.properties:
# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092
source->dest.enabled=true
offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1
Seeing the below on MM2 startup:
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values:
admin.timeout.ms = 60000
checkpoints.topic.replication.factor = 3
config.action.reload = restart
config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
consumer.poll.timeout.ms = 1000
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
enabled = true
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
groups = [.*]
groups.blacklist = [console-consumer-.*, connect-.*, __.*]
header.converter = null
heartbeats.topic.replication.factor = 3
key.converter = null
metric.reporters = null
name = source->dest
offset-syncs.topic.replication.factor = 3
offset.lag.max = 100
refresh.groups.enabled = true
refresh.groups.interval.seconds = 600
refresh.topics.enabled = true
refresh.topics.interval.seconds = 600
replication.factor = 2
replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
source.cluster.alias = source
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
sync.topic.acls.enabled = true
sync.topic.acls.interval.seconds = 600
sync.topic.configs.enabled = true
sync.topic.configs.interval.seconds = 600
target.cluster.alias = dest
task.assigned.groups = null
task.assigned.partitions = null
tasks.max = 1
topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
topics = [.*]
topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
transforms = []
value.converter = null
(org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)
My data is being replicated as expected. Source topic gets created in the destination cluster as source.<TOPIC>
. But, the consumer group offset is not being replicated.
Started a consumer group in the source cluster.
./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group
Consumed few messages and stopped it. Posted new messages in this topic and mirror maker also mirrored the data to the target cluster.
I tried to consume message from the target cluster as follows.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group
Since, I use the same consumer group, I was expecting my offset also to be synced and won't consume the same message which I consumed in the cluster1. But, still consume all the messages. Is there anything I am missing here?
Upvotes: 8
Views: 10122
Reputation: 399
Kafka 2.7 introduced "automated consumer offset sync". By default, consumer offsets are not synced between clusters. You should explicitly enable this feature.
support automated consumer offset sync across clusters in MM 2.0
Upvotes: 4
Reputation: 23
My data is being replicated as expected. Source topic gets created in the destination cluster as source.. But, the consumer group offset is not being replicated.
By default, MM2 won't replicate consumer groups from kafka-console-consumer
. In the MM2 logs on startup, we can see that groups.blacklist = [console-consumer-.*, connect-.*, __.*]
. I believe you can override this in your mm2.properties
configuration file.
Since, I use the same consumer group, I was expecting my offset also to be synced and wont consume the same message which I consumed in the cluster1.
Once the consumer groups are properly being mirrored and the checkpoints are enabled, there should be an internal topic that is automatically created in your destination cluster (something like dest.checkpoints.internal
). This checkpoint topic contains the last committed offsets in the source and destination clusters for mirrored topic partitions in each consumer group.
Then you can use Kafka’s RemoteClusterUtils utility class to translate these offsets and get the synced offsets for source.test-1
that map to the consumer's last committed offsets for test-1
. If you end up creating a consumer with Java, you can add the RemoteClusterUtils
as a dependency to your project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>2.4.0</version>
</dependency>
Otherwise, it is likely that you will have to write a tool that wraps RemoteClusterUtils.java
to get the translated offsets. This functionality or something similar looks to be planned as part of a future release for MM2.
Upvotes: 2
Reputation: 11
I see that your configurations on checkpoints are
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
So, your checkpoints topic will reflect the new changes only after 60 sec. If you try immediately it won't work So, try after 1 min.
Upvotes: 1
Reputation: 24202
there are several fundamental reasons why replicating offsets is non-trivial:
looking at the MM2 KIP there's an "offset sync topic" mentioned. in your code you can use class RemoteClusterUtils to translate checkpoints between clusters:
Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);
this was taken out of the following presentation - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019
alternatively, you could use the seek by timespamp API to start your consumer group on the destination to the rough time at which data was delivered to the destination (or delivered to source, if the broker settings for log append timestamps on the destination dont overwrite those times). you'd need to rewind a little for safety.
Upvotes: 4