Reputation: 1721
We are trying to replicate our offsets between 2 data centers. For a single consumer was really easy, adding only:
consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
Now we have an app that is using kafka-streams. And after tying multiple things we cannot replicate offsets as the one before. For example we have tried as well:
kafka.streams.properties.consumer.interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
but no luck Thanks!
Upvotes: 0
Views: 546
Reputation: 1721
In the end Kafka Streams removes group.id parameter. However the replicator still needs it.
override fun configure(configs: Map<String, *>?) {
val newConfigs = configs!! + mapOf("group.id" to configs?.get("customgroupid"))
super.configure(newConfigs)
}
}
Then add this to your .properties file:
kafka.streams.consumer.customgroupid=${kafka.streams.group.id} # customgroupid is not removed
kafka.streams.consumer.interceptor.classes=com.myjob.mymodule.ConsumerTimestampsInterceptorConfigurator
Upvotes: 0
Reputation: 191983
Try doing it in code rather than a property file
For example, the producer
config.put(
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerTimestampsInterceptor.class.getName()
);
Or, make sure kafka.streams.properties
is the correct property prefix for creating all the StreamsConfig properties
Upvotes: 1