Reputation: 337
According with the documentation if you don't provide a value it will read from all collections
"name of the collection in the database to watch for changes. If not set then all collections will be watched."
I saw the connector source code and I confirmed this:
However if the collection is not provided I got an error like this:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 73 (InvalidNamespace): '{aggregate: 1} is not valid for '$changeStream'; a collection is required.' on server localhost:27018. The full response is {"operationTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "ok": 0.0, "errmsg": "{aggregate: 1} is not valid for '$changeStream'; a collection is required.", "code": 73, "codeName": "InvalidNamespace", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "signature": {"hash": {"$binary": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "$type": "00"}, "keyId": {"$numberLong": "0"}}}}
This is my configuration file
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017,localhost:27018/order
database=order
collection=
topic.prefix=redemption
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
copy.existing=true
errors.tolerance=all
If a collection is used, I'm able to use the connector and generate topics.
Seeing the logs it appears the connector is connecting to the db:
INFO Watching for database changes on 'order' (com.mongodb.kafka.connect.source.MongoSourceTask:620)
Source Code
else if (collection.isEmpty()) {
LOGGER.info("Watching for database changes on '{}'", database);
MongoDatabase db = mongoClient.getDatabase(database);
changeStream = pipeline.map(db::watch).orElse(db.watch());
} else
If I go to my mongo console, I'm having the following:
rs0:SECONDARY> db.watch()
2020-10-28T18:13:50.344-0600 E QUERY [thread1] TypeError: db.watch is not a function :
@(shell):1:1
rs0:SECONDARY> db.watch
test.watch
Upvotes: 0
Views: 1606
Reputation: 3182
You can listen to multiple change streams from multiple mongo collections. You just need to provide the Regex for the collection names in pipeline
, you can even provide the Regex for database names if you have multiple databases to listen to.
"pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"
You can even exclude any given database using $nin
, which you dont want to listen for any change-stream.
"pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/,\"$nin\":[/^any_database_name$/]}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"
Here is the complete Kafka connector configuration.
Mongo to Kafka source connector
{
"name": "mongo-to-kafka-connect",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only": "true",
"tasks.max": "3",
"key.converter.schemas.enable": "false",
"topic.creation.enable": "true",
"poll.await.time.ms": 1000,
"poll.max.batch.size": 100,
"topic.prefix": "any prefix for topic name",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
"value.converter.schemas.enable": "false",
"copy.existing": "true",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
"topic.creation.compacted.cleanup.policy": "compact",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"mongo.errors.log.enable": "true",
"heartbeat.interval.ms": 10000,
"pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"
}
}
You can get more details from official docs.
Upvotes: 1
Reputation: 337
I was using mongo 3.6 version which supports to watch collections but doesn't support to watch databases or deployments (instances), therefore I was getting those errors.
I found this on the documentation:
Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.
https://docs.mongodb.com/manual/changeStreams/#watch-collection-database-deployment
Upvotes: 1