Reputation: 31
I'm very new to both Kafka and MongoDB but I have an existing Kafka topic which I want to consume into a Mongo database. I'm trying to avoid building my own Consumer or Connector and instead use only a Sink Connector. This way, I can avoid the additional maintenance, cost and complications which would come with having a separate Connector/Consumer component, as well as avoiding the work to build it in the first place. The complicated part is that I want to process the messages in two ways before doing any inserting:
Ignore messages whose values don't meet a condition (colour = green, for example)
Of those who do meet this condition, only insert certain fields from the message into the collection
The transformation part I think I can do using Post Processors. However, I haven't seen an example of a way to conditionally ignore certain messages entirely. I would have thought this would be something fairly trivial for the Sink Connector to do compared to transforming data. Am I missing something?
Upvotes: 3
Views: 620
Reputation: 629
Here is what you can do:
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.colour == 'green'
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=colour,size,etc.
This is an example how you can utilize third party debezium filter smt to filter out messages and native MongoDB Kafka sink connector post processors to transform output document.
Upvotes: 0