jackyroo
jackyroo

Reputation: 99

Kafka Connect - MongoDB Source Connector - Pipeline Not Working

I am setting up a Kafka Connector using MongoDB Source Connector.

The configuration looks like the following:

{
  "name": "MongoSourceConn",
  "config": {
    "name": "MongoSourceConn",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "publish.full.document.only": true,
    "topics": "test_topic",
    "connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
    "database": "kafka",
    "collection": "test_topic",
    "pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
    "transforms":"dropPrefix",
    "transforms.dropPrefix.regex":"kafka.test_topic",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.replacement":"test_topic"

If I remove the "pipeline" line, the Source Connector works fine, but obviously all the documents will be pushed to the Topic, which is not what I want.

If I add back the "pipeline" line, the Source Connector doesn't push any message to my Topic and I can't understand why. What am I missing? Here's how a document in our mongo looks like:

{
    "_id" : ObjectId("61570b1d21589e03f8011235"),
    "jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
    "jobStatus" : 5,
    "data" : null,
    "createdOn" : "2021-10-01 13:20:29.215691"
}

The configuration is being pushed through rest api, so that's why it has the "dictionary" look with all the escape characters (\ ").

Thanks.

Upvotes: 0

Views: 1429

Answers (1)

Dennis Jaheruddin
Dennis Jaheruddin

Reputation: 21563

It seems pretty clear that this pipeline will never match because it currently contains {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}

You mention that you removed it, but without seeing anything more it is impossible to know exactly how you removed it, so perhaps something went wrong there.

Also it is unclear how the data exactly looks once you get it. You show a message as it is in Mongo, but perhaps that gets wrapped into something else (e.g. due to changestreams), so perhaps the field jobStatus might not be available on the top level but ends up nested.

I would recommend these steps:

  1. Check how your data looks in kafka when ingested without a pipeline
  2. Start with the most trivial pipeline that only does 1 thing
  3. Play with that until you are able to use a pipeline somehow
  4. Then keep expanding the logic till you are back at what you want

I know these steps are a bit generic, but together with what is indicated above it is hopefully enough.

Upvotes: 2

Related Questions