Mr9
Mr9

Reputation: 159

Configure Pipeline in MongoDB Kafka source connector

I am using following configuration for source connector to filter and read only specific records with status as "PENDING" from MongoDB. Need to poll all records that are inserted/updated with PENDING status only. Source connector able to poll all records if pipeline is excluded. Can someone help me understand how to fix this and also is there a way to know that polling is completed like batch job is completed to orchestrate another process on kafka consumer?

name=mongo-source-demo
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://username:password@hostname:27017
database=test
collection=mongoDBtest
topic.prefix=mongodb.connector
poll.max.batch.size=1000
poll.await.time.ms=100000
publish.full.document.only=true
pipeline=[{"$match": { "Status" : "PENDING" }},{"$project":{"_id":1,"fullDocument":1}} ]

Upvotes: 0

Views: 1344

Answers (2)

Abhijeet B Patil
Abhijeet B Patil

Reputation: 1

You need to change your pipeline config to below and it should work. It will filter and give you all the docs from mongo with Status as PENDING. Apart from this you can use other filters in pipeline.

"pipeline": "[{ \"$match\":{\"fullDocument.Status\":\"PENDING\"} }]"

Upvotes: 0

Ricardo Moreira
Ricardo Moreira

Reputation: 41

The problem is with the "" in the MongoDB operators. The correct pipeline configuration should be:

pipeline=[{$match: { "Status" : "PENDING" }},{$project:{"_id":1,"fullDocument":1}} ]

Upvotes: 0

Related Questions