user1578872
user1578872

Reputation: 9118

Kafka MirrorMaker2 - not mirroring consumer group offsets

I have setup MirrorMaker2 for replicating data between 2 DCs.

My mm2.properties:

# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

Seeing the below on MM2 startup:

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

My data is being replicated as expected. Source topic gets created in the destination cluster as source.<TOPIC>. But, the consumer group offset is not being replicated.

Started a consumer group in the source cluster.

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

Consumed few messages and stopped it. Posted new messages in this topic and mirror maker also mirrored the data to the target cluster.

I tried to consume message from the target cluster as follows.

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

Since, I use the same consumer group, I was expecting my offset also to be synced and won't consume the same message which I consumed in the cluster1. But, still consume all the messages. Is there anything I am missing here?

Upvotes: 8

Views: 10122

Answers (4)

Inako
Inako

Reputation: 399

Kafka 2.7 introduced "automated consumer offset sync". By default, consumer offsets are not synced between clusters. You should explicitly enable this feature.

support automated consumer offset sync across clusters in MM 2.0

Upvotes: 4

macedonianmeasure
macedonianmeasure

Reputation: 23

My data is being replicated as expected. Source topic gets created in the destination cluster as source.. But, the consumer group offset is not being replicated.

By default, MM2 won't replicate consumer groups from kafka-console-consumer. In the MM2 logs on startup, we can see that groups.blacklist = [console-consumer-.*, connect-.*, __.*]. I believe you can override this in your mm2.properties configuration file.

Since, I use the same consumer group, I was expecting my offset also to be synced and wont consume the same message which I consumed in the cluster1.

Once the consumer groups are properly being mirrored and the checkpoints are enabled, there should be an internal topic that is automatically created in your destination cluster (something like dest.checkpoints.internal). This checkpoint topic contains the last committed offsets in the source and destination clusters for mirrored topic partitions in each consumer group.

Then you can use Kafka’s RemoteClusterUtils utility class to translate these offsets and get the synced offsets for source.test-1 that map to the consumer's last committed offsets for test-1. If you end up creating a consumer with Java, you can add the RemoteClusterUtils as a dependency to your project:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-mirror-client</artifactId>
    <version>2.4.0</version>
</dependency>

Otherwise, it is likely that you will have to write a tool that wraps RemoteClusterUtils.java to get the translated offsets. This functionality or something similar looks to be planned as part of a future release for MM2.

Upvotes: 2

Sanguine
Sanguine

Reputation: 11

I see that your configurations on checkpoints are

emit.checkpoints.enabled = true 
emit.checkpoints.interval.seconds = 60 

So, your checkpoints topic will reflect the new changes only after 60 sec. If you try immediately it won't work So, try after 1 min.

Upvotes: 1

radai
radai

Reputation: 24202

there are several fundamental reasons why replicating offsets is non-trivial:

  1. kafka is an at-least-once system (ignoring the hype). this means that mirror maker, because its built on top of kafka consumers and producers that can each timeout/disconnect, will result in some degree of duplicate records being delivered to the destination. this means that offsets dont map 1:1 between source and destination. even if you were to try and use the "exactly once" support (which the MM2 KIP clearly says its not using) all it would do is skip over partially-delivered batches, but those batches would still occupy offsets at the destination
  2. if you setup mirroring long after the source topic has started expiring records, your destination topic will start at offset 0 while the source will have much higher "oldest" offsets. there has been an attempt to address this (see KIP-391) but it was never accepted
  3. in general there's no guarantee that your mirroring topology mirrors from a single source to a single destination. the linkedin topology, for example, mirrors from multiple source clusters into "aggregate" tier clusters. mapping offsets is meaningless for such topologies

looking at the MM2 KIP there's an "offset sync topic" mentioned. in your code you can use class RemoteClusterUtils to translate checkpoints between clusters:

Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

this was taken out of the following presentation - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

alternatively, you could use the seek by timespamp API to start your consumer group on the destination to the rough time at which data was delivered to the destination (or delivered to source, if the broker settings for log append timestamps on the destination dont overwrite those times). you'd need to rewind a little for safety.

Upvotes: 4

Related Questions