zkropotkine
zkropotkine

Reputation: 337

Use multiple collections with MongoDB Kafka Connector

According with the documentation if you don't provide a value it will read from all collections

"name of the collection in the database to watch for changes. If not set then all collections will be watched."

I saw the connector source code and I confirmed this:

https://github.com/mongodb/mongo-kafka/blob/k133/src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java#L462

However if the collection is not provided I got an error like this:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 73 (InvalidNamespace): '{aggregate: 1} is not valid for '$changeStream'; a collection is required.' on server localhost:27018. The full response is {"operationTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "ok": 0.0, "errmsg": "{aggregate: 1} is not valid for '$changeStream'; a collection is required.", "code": 73, "codeName": "InvalidNamespace", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "signature": {"hash": {"$binary": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "$type": "00"}, "keyId": {"$numberLong": "0"}}}}

This is my configuration file

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017,localhost:27018/order
database=order
collection=

topic.prefix=redemption
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
copy.existing=true
errors.tolerance=all

If a collection is used, I'm able to use the connector and generate topics.

Seeing the logs it appears the connector is connecting to the db:

INFO Watching for database changes on 'order' (com.mongodb.kafka.connect.source.MongoSourceTask:620)

Source Code

else if (collection.isEmpty()) {
      LOGGER.info("Watching for database changes on '{}'", database);
      MongoDatabase db = mongoClient.getDatabase(database);
      changeStream = pipeline.map(db::watch).orElse(db.watch());
    } else

If I go to my mongo console, I'm having the following:

rs0:SECONDARY> db.watch()
2020-10-28T18:13:50.344-0600 E QUERY    [thread1] TypeError: db.watch is not a function :
@(shell):1:1
rs0:SECONDARY> db.watch
test.watch

Upvotes: 0

Views: 1606

Answers (2)

Keshav Lodhi
Keshav Lodhi

Reputation: 3182

You can listen to multiple change streams from multiple mongo collections. You just need to provide the Regex for the collection names in pipeline, you can even provide the Regex for database names if you have multiple databases to listen to.

"pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"  

You can even exclude any given database using $nin, which you dont want to listen for any change-stream.

"pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/,\"$nin\":[/^any_database_name$/]}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"

Here is the complete Kafka connector configuration.

Mongo to Kafka source connector

{
  "name": "mongo-to-kafka-connect",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "publish.full.document.only": "true",
    "tasks.max": "3",
    "key.converter.schemas.enable": "false",
    "topic.creation.enable": "true",
    "poll.await.time.ms": 1000,
    "poll.max.batch.size": 100,
    "topic.prefix": "any prefix for topic name",
    "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
    "connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
    "value.converter.schemas.enable": "false",
    "copy.existing": "true",
    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 3,
    "topic.creation.compacted.cleanup.policy": "compact",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "mongo.errors.log.enable": "true",
    "heartbeat.interval.ms": 10000,
    "pipeline": "[{\"$match\":{\"$and\":[{\"ns.db\":{\"$regex\":/^database-name$/}},{\"ns.coll\":{\"$regex\":/^collections_.*/}}]}}]"
  }
}

You can get more details from official docs.

Upvotes: 1

zkropotkine
zkropotkine

Reputation: 337

I was using mongo 3.6 version which supports to watch collections but doesn't support to watch databases or deployments (instances), therefore I was getting those errors.

I found this on the documentation:

Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.

https://docs.mongodb.com/manual/changeStreams/#watch-collection-database-deployment

Upvotes: 1

Related Questions