Reputation: 159
I am using following configuration for source connector to filter and read only specific records with status as "PENDING" from MongoDB. Need to poll all records that are inserted/updated with PENDING status only. Source connector able to poll all records if pipeline is excluded. Can someone help me understand how to fix this and also is there a way to know that polling is completed like batch job is completed to orchestrate another process on kafka consumer?
name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]
Upvotes: 0
Views: 1344
Reputation: 1
You need to change your pipeline config to below and it should work. It will filter and give you all the docs from mongo with Status as PENDING. Apart from this you can use other filters in pipeline.
"pipeline": "[{ \"$match\":{\"fullDocument.Status\":\"PENDING\"} }]"
Upvotes: 0
Reputation: 41
The problem is with the "" in the MongoDB operators. The correct pipeline configuration should be:
pipeline=[{$match: { "Status" : "PENDING" }},{$project:{"_id":1,"fullDocument":1}} ]
Upvotes: 0