Reputation: 65
I am using embedded debezium in my spring boot service to process db events that happens on my mongodb database.
I am using embedded-debezium and debezium-api and debezium-mongodb-connector dependencies in my service.
while everything is working fine and I am receiving db events whenever some change in any document happens in any collection, I have issue with multi instance deployment. If i deploy 3 instances of my service I recieve same event in all 3 service instances which is issue for me. I only need to process that event only once.
So what would be a good approach to tackle this issue?
Upvotes: 0
Views: 682
Reputation: 573
I solved this problem using hazelcast to lock the processing and to store the the LSN (kind of a combination of the 2 previous answers). I used this to implement a transaction outbox mechanism.
public class OutboxPublisher implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
private final KafkaTemplate<String, String> kafkaTemplate;
private final LockRegistry lockRegistry;
private final HazelcastInstance hazelcastInstance;
@Override
public void handleBatch(
final List<RecordChangeEvent<SourceRecord>> records,
final DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer
) throws InterruptedException {
lockRegistry.executeLocked("my_lock_context", () -> {
long lns = hazelcastInstance.getCPSubsystem()
.getAtomicLong("lns")
.get();
records.forEach(sourceRecord -> this.execute(sourceRecord, committer, lns));
committer.markBatchFinished();
});
}
private void execute(final RecordChangeEvent<SourceRecord> evt,
final RecordCommitter<RecordChangeEvent<SourceRecord>> committer,
long lns
) {
final SourceRecord sourceRecord = evt.record();
final Long id = metadata.getInt64("ID"); // oubox table record id
if (id <= lns) {
log.info("Event already processed. Skipping.");
committer.markProcessed(evt);
return;
}
// do the processing
committer.markProcessed(evt);
hazelcastInstance.getCPSubsystem().getAtomicLong("lns").set(id);
}
}
Upvotes: 1
Reputation:
You can appoint a specific profile to only one of the instances. With this profile, you can enable the configuration of the Debezium beans with the @Profile("cdc")
annotation. This should result in Debezium being used in only one of the application instances. However, if this instance crashes, you need to ensure that a new one will spin up with the 'cdc' profile.
Another solution is to use distributed locks. These locks allow coordination within a set of instances of the same application. One way to utilise locking is by activating a lock before a message is processed, preventing other instances from doing so. And finally after handling the event, you can release the lock.
@ServiceActivator(inputChannel = "debeziumOutputChannel")
public void processDebeziumEvent(Message<ChangeEvent<String, String>> message) {
Lock lock = lockRegistry.obtain("my-lock-key");
if (lock.tryLock()) {
try {
// Process the Debezium event
} finally {
lock.unlock();
}
} else {
// Another instance is already processing the event
}
}
Locking can also be achieved on a higher level. It is possible for an instance to attempt to lock the CDC-process on startup. One thing to keep in mind is that when the instance shuts down or crashes, it should release the lock. The same can be done on the bean level with Spring lifecycle events.
By the way, Spring offers support for Debezium via Spring Integration.
Upvotes: 0
Reputation: 21
Just throwing it out there, you can maintain a record of LSN and check if the received log has a LSN that was received before.
Upvotes: 0