drexler
drexler

Reputation: 51

External offset store with the debezium embedded connector

My team is building a CDC service with the Debezium embedded connector. For the offset storage we're thinking about using S3/DynamoDB. Just wondering if anyone here has written something similar to externalize the offset store and what they chose and why they chose that.

Upvotes: 5

Views: 2699

Answers (3)

Himanshu Bhatnagar
Himanshu Bhatnagar

Reputation: 121

There is a dependency for debezium-redis which you can use now, with dependency-

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
<version>${debezium.redis.version}</version>
</dependency>

see:- https://debezium.io/blog/2022/11/10/debezium-2-1-alpha1-released/

For configurations- https://debezium.io/documentation/reference/stable/operations/debezium-server.html#debezium-source-configuration-properties

Sample Demo:- https://github.com/HimanshBhatnagar/spring-boot-debezium

Upvotes: 0

Pophils
Pophils

Reputation: 11

Had this challenge recently. You can write a custom class that implements org.apache.kafka.connect.storage.FileOffsetBackingStore or extend org.apache.kafka.connect.storage.MemoryOffsetBackingStore. Subsequently ensure "offset.storage" config is set to the fully-qualified class name

Please see a sample below using redis (maybe not in production) as a backing store to give you an idea how this can work.

package com.sample.cdc.offsetbackingstore

import com.sample.cdc.service.RedisManager
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.runtime.WorkerConfig
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore
import java.io.IOException
import java.nio.ByteBuffer
import java.util.concurrent.Callable
import java.util.concurrent.Future


class RedisOffsetBackingStore : MemoryOffsetBackingStore() {
    lateinit var redisManager : RedisManager
    lateinit var redisHost : String
    lateinit var redisPort : String

    override fun configure(config: WorkerConfig?) {
        super.configure(config) 
        
        redisHost = config?.getString("custom.config.redis.host")
        redisPort = config?.getString("custom.config.redis.port") 
    }

    // Called by Debezium Engine at some point
    override fun start() {
        super.start()
        println("Initializing redis manager...")
        redisManager = RedisManager(redisHost, redisPort)
    }
    
    // Called by Debezium Engine during graceful shutdown
    override fun stop() {
        super.stop()
        println("Disposing redis client resources...")
        if(this::redisManager.isInitialized)
            redisManager.dispose()
    }
 
    // Called by DebeziumEngine OffsetReader to read Offset
    override fun get(keys: MutableCollection<ByteBuffer>?): Future<MutableMap<ByteBuffer, ByteBuffer?>> {
        if(data.isNotEmpty())
            return super.get(keys)
        return executor.submit(Callable<MutableMap<ByteBuffer, ByteBuffer?>> {
            val result: MutableMap<ByteBuffer, ByteBuffer?> = HashMap()
            keys?.forEach {
                val offsetKey = String(it.array())
                val offsetValue = redisManager.get(offsetKey)
                if(offsetValue.isNotEmpty()){
                    val buffer = ByteBuffer.wrap(offsetValue.toByteArray())
                    result[it] = buffer
                    data[it] = buffer
                }
            }
            result
        })
    }
    
    // Invoked by set() in MemoryOffsetBackingStore class to persist Offset  
    // during commit or graceful shutdown
    override fun save() {
        try {
            for ((key, value) in data) {
                val offsetKey = String(key!!.array())
                val offsetValue = String(value!!.array())
                redisManager.save(offsetKey, offsetValue)
            }
        } catch (e: IOException) {
            throw ConnectException(e)
        }
    }
}

//Ensure the below config setting is set in DebeziumConfig
//"offset.storage":"com.sample.cdc.offsetbackingstore.RedisOffsetBackingStore",
//"custom.config.redis.host": "localhost"
//"custom.config.redis.port": "6379" 

Note: In case of multiple standalone embedded debezium services (for reliabilty and fault tolerance) with a custom offset backing store, you'll have to provide a way to handle offset race condition, and event deduplication.

Upvotes: 1

Jan Rudert
Jan Rudert

Reputation: 96

We have a Postgres DB as source. Change Data Capture (CDC) is implemented by the Postgres itself (done by the extension pglogical). This CDC subsystem of Postgres is responsible for offset management. The CDC subsytem will maintain a list of CDC clients (aka slots). So if your client creates a CDC connection the DB will start from the point where that client disconnected before (on the same slot). A new client will create a new slot and start receiving only the CDC records created from that point in time on. So there is no need for us to remember the offsets.

Upvotes: 1

Related Questions