Brendan Scullion
Brendan Scullion

Reputation: 440

Is it possible to replicate kafka topics without alias prefix with MirrorMaker2

I'm trying to set up replication between 2 clusters but don't want want the topic names to be changed. for example if i have a topic called "some_topic" it is automatically replicated to "cluster1.some_topic", I'm pretty sure this can be done but haven't found the correct config to change this

My current config "mirrormaker2.properties"

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

for reference:

Upvotes: 18

Views: 16585

Answers (6)

Gunther Vogel
Gunther Vogel

Reputation: 193

Starting with Kafka 3.0.0, it is sufficient to set

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

Also, the PrefixlessReplicationPolicy in marcin-wieloch's answer https://stackoverflow.com/a/60619233/12008693 no longer works with 3.0.0 (NullPointerException).

Upvotes: 17

Rohan Tiwari
Rohan Tiwari

Reputation: 31

I'm trying to set up replication between 2 clusters, but need the same topic name in both the cluster without giving an alias for the in the connect-mirror-maker.properties.

By default, replicated topics are renamed based on source cluster aliases.

    Source --> Target
    topic-1 --> source.topic-1

You can avoid topics being renamed by setting the following properties to blank under your connector properties file. By default, replication.policy.separator property is a period, then by setting it to blank along with the source.cluster.alias, the target topic will have the same name as the source topic.

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

Upvotes: 3

user13726200
user13726200

Reputation: 51

Managed to push replication with Kafka ConfluentINC Connector image release 5.4.2 Properties are:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
target.cluster.alias= 
replication.factor=3
tasks.max=3
topics=.*
source.cluster.alias= 
target.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
replication.policy.separator= 
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

1) Leave space after the 3 parameters: source.cluster.alias, replication.policy.separator, target.cluster.alias.

2) Set this mirror connector on the TARGET Kafka, NOT on the source (performs pull only)

In addition yo can use Conductor or Kafka Connector UI landoop image - landoop/kafka-connect-ui

This is still in a testing scenario, but it looks promising.

Upvotes: 5

Marcin Wieloch
Marcin Wieloch

Reputation: 141

To 'disable' topic prefixes and to have topic properties mirrored properly at the same time, I had to provide a customized replication policy which also overrides the topicSource method. Otherwise non-default topic properties (e.g., "cleanup.policy=compact") have not been mirrored, even after restarting mirror maker.

Here is the complete procedure that worked for me:

  1. Compile and package the following customized replication policy into a .jar file (full source code can be found here):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
  1. Copy the .jar into the ${KAFKA_HOME/libs directory
  2. Configure Mirror Maker 2 to use that replication policy by setting the replication.policy.class property in ${KAFKA_HOME}/config/mm2.properties:
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy

Upvotes: 14

Zhou Yang
Zhou Yang

Reputation: 99

I think the answer above is inappropriate.

In Mirror Maker 2.0, if you want to keep the topic unmodified, you have to implement ReplicationPolicy.

You can refer to DefaultReplicationPolicy.class, and then override formatRemoteTopic(), after that you have to remove sourceClusterAlias + separator. In the end, configure replication.policy.class in the mm2.properties

I defined MigrationReplicationPolicy.class

replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy

You should see MirrorClientConfig,class, I know that you will understand

Upvotes: 6

Francisco L. Gualdi
Francisco L. Gualdi

Reputation: 131

I was able to remove the prefix using this setup:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",

If the alias setting is necessary in your case, I understand you should use other replicationPolicy class. By default is using DefaultReplicationPolicy class (https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html)

Upvotes: 10

Related Questions