Reputation: 51
My team is building a CDC service with the Debezium embedded connector. For the offset storage we're thinking about using S3/DynamoDB. Just wondering if anyone here has written something similar to externalize the offset store and what they chose and why they chose that.
Upvotes: 5
Views: 2699
Reputation: 121
There is a dependency for debezium-redis which you can use now, with dependency-
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
<version>${debezium.redis.version}</version>
</dependency>
see:- https://debezium.io/blog/2022/11/10/debezium-2-1-alpha1-released/
For configurations- https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-configuration-properties
Sample Demo:- https://github.com/HimanshBhatnagar/spring-boot-debezium
Upvotes: 0
Reputation: 11
Had this challenge recently. You can write a custom class that implements org.apache.kafka.connect.storage.FileOffsetBackingStore
or extend org.apache.kafka.connect.storage.MemoryOffsetBackingStore
.
Subsequently ensure "offset.storage" config is set to the fully-qualified class name
Please see a sample below using redis (maybe not in production) as a backing store to give you an idea how this can work.
package com.sample.cdc.offsetbackingstore
import com.sample.cdc.service.RedisManager
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.runtime.WorkerConfig
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore
import java.io.IOException
import java.nio.ByteBuffer
import java.util.concurrent.Callable
import java.util.concurrent.Future
class RedisOffsetBackingStore : MemoryOffsetBackingStore() {
lateinit var redisManager : RedisManager
lateinit var redisHost : String
lateinit var redisPort : String
override fun configure(config: WorkerConfig?) {
super.configure(config)
redisHost = config?.getString("custom.config.redis.host")
redisPort = config?.getString("custom.config.redis.port")
}
// Called by Debezium Engine at some point
override fun start() {
super.start()
println("Initializing redis manager...")
redisManager = RedisManager(redisHost, redisPort)
}
// Called by Debezium Engine during graceful shutdown
override fun stop() {
super.stop()
println("Disposing redis client resources...")
if(this::redisManager.isInitialized)
redisManager.dispose()
}
// Called by DebeziumEngine OffsetReader to read Offset
override fun get(keys: MutableCollection<ByteBuffer>?): Future<MutableMap<ByteBuffer, ByteBuffer?>> {
if(data.isNotEmpty())
return super.get(keys)
return executor.submit(Callable<MutableMap<ByteBuffer, ByteBuffer?>> {
val result: MutableMap<ByteBuffer, ByteBuffer?> = HashMap()
keys?.forEach {
val offsetKey = String(it.array())
val offsetValue = redisManager.get(offsetKey)
if(offsetValue.isNotEmpty()){
val buffer = ByteBuffer.wrap(offsetValue.toByteArray())
result[it] = buffer
data[it] = buffer
}
}
result
})
}
// Invoked by set() in MemoryOffsetBackingStore class to persist Offset
// during commit or graceful shutdown
override fun save() {
try {
for ((key, value) in data) {
val offsetKey = String(key!!.array())
val offsetValue = String(value!!.array())
redisManager.save(offsetKey, offsetValue)
}
} catch (e: IOException) {
throw ConnectException(e)
}
}
}
//Ensure the below config setting is set in DebeziumConfig
//"offset.storage":"com.sample.cdc.offsetbackingstore.RedisOffsetBackingStore",
//"custom.config.redis.host": "localhost"
//"custom.config.redis.port": "6379"
Note: In case of multiple standalone embedded debezium services (for reliabilty and fault tolerance) with a custom offset backing store, you'll have to provide a way to handle offset race condition, and event deduplication.
Upvotes: 1
Reputation: 96
We have a Postgres DB as source. Change Data Capture (CDC) is implemented by the Postgres itself (done by the extension pglogical). This CDC subsystem of Postgres is responsible for offset management. The CDC subsytem will maintain a list of CDC clients (aka slots). So if your client creates a CDC connection the DB will start from the point where that client disconnected before (on the same slot). A new client will create a new slot and start receiving only the CDC records created from that point in time on. So there is no need for us to remember the offsets.
Upvotes: 1