Selvakumar Ponnusamy
Selvakumar Ponnusamy

Reputation: 5563

Overriding Mongo Source connector

Im going to use Kafka connect to consume messages from MongoDB and publish into Kafka topic.

By default Mongo source connector creates a topic per collection. But I will have many collections and would like to have just one topic for all the collections. A message will have collection name.

  1. Is it better approach to override mongo-source connector? If it so what are the things I should have keep in my mind
  2. Is there any settings provided already? I know specifying collection as empty while creating it would listen all the collections. But it creates a topic per collection.

Upvotes: 1

Views: 534

Answers (1)

Selvakumar Ponnusamy
Selvakumar Ponnusamy

Reputation: 5563

As @onecricketeer suggested I used RegexRouter. So no need to override mongo source connector to publish documents from all the collections into the same topic.

This is my configuration, It listens all the collection matching the pipeline and publishes into mongodbTopic

{
"name": "mongo-source",
"config": {
    "tasks.max": "1",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "<connection_uri>",
    "topic.prefix": "",
    "pipeline": "[{\"$match\": { \"$or\": [{\"fullDocument._kind\":\"collection\"},{\"fullDocument._isInverse\":false}],\"ns.coll\": {\"$regex\": /^(.*_related)$|^(my_collection_test)$/}}}]",
    "poll.await.time.ms": 5,
    "poll.max.batch.size": 2000,
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": ".*",
    "transforms.dropPrefix.replacement": "mongodbTopic",
    "errors.tolerance": "none",
    "copy.existing": true
    }
}

For more Info https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html

Upvotes: 2

Related Questions