Reputation: 440
I'm trying to set up replication between 2 clusters but don't want want the topic names to be changed. for example if i have a topic called "some_topic" it is automatically replicated to "cluster1.some_topic", I'm pretty sure this can be done but haven't found the correct config to change this
My current config "mirrormaker2.properties"
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
for reference:
Upvotes: 18
Views: 16585
Reputation: 193
Starting with Kafka 3.0.0, it is sufficient to set
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
Also, the PrefixlessReplicationPolicy in marcin-wieloch's answer https://stackoverflow.com/a/60619233/12008693 no longer works with 3.0.0 (NullPointerException).
Upvotes: 17
Reputation: 31
I'm trying to set up replication between 2 clusters, but need the same topic name in both the cluster without giving an alias for the in the connect-mirror-maker.properties.
By default, replicated topics are renamed based on source cluster aliases.
Source --> Target
topic-1 --> source.topic-1
You can avoid topics being renamed by setting the following properties to blank under your connector properties file. By default, replication.policy.separator property is a period, then by setting it to blank along with the source.cluster.alias, the target topic will have the same name as the source topic.
replication.policy.separator=
source.cluster.alias=
target.cluster.alias=
Upvotes: 3
Reputation: 51
Managed to push replication with Kafka ConfluentINC Connector image release 5.4.2 Properties are:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
target.cluster.alias=
replication.factor=3
tasks.max=3
topics=.*
source.cluster.alias=
target.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
replication.policy.separator=
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
1) Leave space after the 3 parameters: source.cluster.alias, replication.policy.separator, target.cluster.alias.
2) Set this mirror connector on the TARGET Kafka, NOT on the source (performs pull only)
In addition yo can use Conductor or Kafka Connector UI landoop image - landoop/kafka-connect-ui
This is still in a testing scenario, but it looks promising.
Upvotes: 5
Reputation: 141
To 'disable' topic prefixes and to have topic properties mirrored properly at the same time, I had to provide a customized replication policy which also overrides the topicSource
method. Otherwise non-default topic properties (e.g., "cleanup.policy=compact"
) have not been mirrored, even after restarting mirror maker.
Here is the complete procedure that worked for me:
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
super.configure(props);
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
if (sourceClusterAlias == null) {
String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
log.error(logMessage);
throw new RuntimeException(logMessage);
}
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return topic;
}
@Override
public String topicSource(String topic) {
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
${KAFKA_HOME/libs
directoryreplication.policy.class
property in ${KAFKA_HOME}/config/mm2.properties
: replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
Upvotes: 14
Reputation: 99
I think the answer above is inappropriate.
In Mirror Maker 2.0, if you want to keep the topic unmodified, you have to implement ReplicationPolicy.
You can refer to DefaultReplicationPolicy.class, and then override formatRemoteTopic()
, after that you have to remove sourceClusterAlias + separator
. In the end, configure replication.policy.class
in the mm2.properties
I defined MigrationReplicationPolicy.class
replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy
You should see MirrorClientConfig,class
, I know that you will understand
Upvotes: 6
Reputation: 131
I was able to remove the prefix using this setup:
"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",
If the alias setting is necessary in your case, I understand you should use other replicationPolicy class. By default is using DefaultReplicationPolicy class (https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html)
Upvotes: 10