Reputation: 99
I am setting up a Kafka Connector using MongoDB Source Connector.
The configuration looks like the following:
{
"name": "MongoSourceConn",
"config": {
"name": "MongoSourceConn",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url":"http://schema-registry:8081",
"publish.full.document.only": true,
"topics": "test_topic",
"connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
"database": "kafka",
"collection": "test_topic",
"pipeline": "[{ \"$match\": { \"$and\": [ {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}, {\"jobStatus\": {\"$eq\": 5}} ] }} ]"
}
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"kafka.test_topic",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement":"test_topic"
If I remove the "pipeline" line, the Source Connector works fine, but obviously all the documents will be pushed to the Topic, which is not what I want.
If I add back the "pipeline" line, the Source Connector doesn't push any message to my Topic and I can't understand why. What am I missing? Here's how a document in our mongo looks like:
{
"_id" : ObjectId("61570b1d21589e03f8011235"),
"jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
"jobStatus" : 5,
"data" : null,
"createdOn" : "2021-10-01 13:20:29.215691"
}
The configuration is being pushed through rest api, so that's why it has the "dictionary" look with all the escape characters (\ ").
Thanks.
Upvotes: 0
Views: 1429
Reputation: 21563
It seems pretty clear that this pipeline will never match because it currently contains {\"operationType\": { \"$in\": [ \"update\",\"insert\" ]}}
You mention that you removed it, but without seeing anything more it is impossible to know exactly how you removed it, so perhaps something went wrong there.
Also it is unclear how the data exactly looks once you get it. You show a message as it is in Mongo, but perhaps that gets wrapped into something else (e.g. due to changestreams), so perhaps the field jobStatus might not be available on the top level but ends up nested.
I would recommend these steps:
I know these steps are a bit generic, but together with what is indicated above it is hopefully enough.
Upvotes: 2