Reputation: 21
I need to implement a CDC pattern but can't manage to make it work : my debezium worker is up and running, but my connectors still fail, despite my efforts.
I've tested a simple "watch" on my mongo cluster and it works :
watchCursor = db.mydb.watch()
while (!watchCursor.isExhausted()){
if (watchCursor.hasNext()){
print(watchCursor.next());
}
}
So, i can tell that my user has rights on the clusters to watch changestreams.
I still have an error when running my tasks :
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.114.129.247:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:115)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Error while attempting to get oplog position\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:153)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:135)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to get oplog position\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbSnapshotChangeEventSource.java:234)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$determineSnapshotOffsets$6(MongoDbSnapshotChangeEventSource.java:295)\n\tat java.base/java.util.HashMap$Values.forEach(HashMap.java:976)\n\tat io.debezium.connector.mongodb.ReplicaSets.onEachReplicaSet(ReplicaSets.java:115)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.determineSnapshotOffsets(MongoDbSnapshotChangeEventSource.java:290)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.doExecute(MongoDbSnapshotChangeEventSource.java:99)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.doExecute(MongoDbSnapshotChangeEventSource.java:52)\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)\n\t... 8 more\nCaused by: com.mongodb.MongoQueryException: Query failed with error code 13 and error message 'not authorized on local to execute command { find: \"oplog.rs\", filter: {}, sort: { $natural: -1 }, limit: 1, singleBatch: true, $db: \"local\", lsid: { id: UUID(\"de332d4a-32ec-424f-ae09-b32376444d11\") }, $readPreference: { mode: \"primaryPreferred\" } }' on server rc1a-REDACTED.mdb.yandexcloud.net:27018\n\tat com.mongodb.internal.operation.FindOperation$1.call(FindOperation.java:663)\n\tat com.mongodb.internal.operation.FindOperation$1.call(FindOperation.java:653)\n\tat com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)\n\tat com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:653)\n\tat com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:81)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)\n\tat com.mongodb.client.internal.FindIterableImpl.first(FindIterableImpl.java:200)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$determineSnapshotOffsets$5(MongoDbSnapshotChangeEventSource.java:297)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)\n\t... 15 more\n"
}
]
Simply said, i do not have rights on oplog.rs. Despite i'm not using oplog, but "changestream".
Here is my configuration
{
"name": "account3-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"errors.log.include.messages": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"mongodb.password": "REDACTED",
"transforms": "unwrap,idToKey,extractIdKey",
"capture.mode": "change_streams_update_full",
"collection.include.list": "account.account",
"mongodb.ssl.enabled": "false",
"transforms.idToKey.fields": "id",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms.extractIdKey.field": "id",
"database.include.list": "account",
"errors.log.enable": "true",
"mongodb.hosts": "rc1a-REDACTED.mdb.yandexcloud.net:27018,rc1b-REDACTED.mdb.yandexcloud.net:27018,rc1c-REDACTED.mdb.yandexcloud.net:27018",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms.idToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"mongodb.user": "debezium",
"mongodb.name": "loyalty.raw.last",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"name": "account3-connector",
"errors.tolerance": "all",
"transforms.extractIdKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key"
},
"tasks": [],
"type": "source"
}
Do any of you have an idea ? What could be wrong ?
I'm using Debezium 1.8.1 on Confluent base image 6.2.0
FROM confluentinc/cp-kafka-connect-base:6.2.0
Upvotes: 2
Views: 1807
Reputation: 1281
I have ran in to the same problem:
You need to add permission to read the oplog.rs
collection in the local
database.
During initialization the connector tries to query the oplog even when using change streams.
This is the full stack trace:
com.mongodb.MongoQueryException: Query failed with error code 13 with name 'Unauthorized' and error message 'not authorized on local to execute command { find: \"oplog.rs\", filter: {}, sort: { $natural: -1 }, limit: 1, singleBatch: true, $db: \"local\", lsid: { id: UUID(\"51399eae-bb7a-4706-b35c-048e878d23a8\") }, $readPreference: { mode: \"primaryPreferred\" } }' on server pl-0-westeurope-azure.pi8eh.mongodb.net:1025
at com.mongodb.internal.operation.FindOperation.lambda$execute$1(FindOperation.java:699)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:566)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:565)
at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:564)
at com.mongodb.internal.operation.FindOperation.lambda$execute$2(FindOperation.java:690)
at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
at com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:722)
at com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:86)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
at com.mongodb.client.internal.FindIterableImpl.first(FindIterableImpl.java:213)
at io.debezium.connector.mongodb.MongoUtil.getOplogEntry(MongoUtil.java:236)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$initializeOffsets$4(MongoDbStreamingChangeEventSource.java:306)
at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:317)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$initializeOffsets$5(MongoDbStreamingChangeEventSource.java:305)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:977)
at io.debezium.connector.mongodb.ReplicaSets.onEachReplicaSet(ReplicaSets.java:120)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.initializeOffsets(MongoDbStreamingChangeEventSource.java:300)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:89)
at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:51)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
The MongoUtil.getOplogEntry method is called which queries the "oplog.rs" collection from the "local" database. My guess is that the user does not have permission to perform this query.
Adding permission to read the oplog.rs collection fixes the problem.
Upvotes: 0
Reputation: 1
December 2023 I still have this error, debezium runs but it does not display the topic on Kafka UI, I visited the debezium community to ask this problem, you can see it here
In that link, you can solve this problem by using Mongodb connector from their homepage, but it will expose your password on Kafka UI or any other platform.
Edit 06/12/2023: I solved the problem based on the user's answer above and a few links below:
Upvotes: 0