Alex Hill
Alex Hill

Reputation: 31

Filtering and transformation messages with MongoDB Kafka Sink Connector?

I'm very new to both Kafka and MongoDB but I have an existing Kafka topic which I want to consume into a Mongo database. I'm trying to avoid building my own Consumer or Connector and instead use only a Sink Connector. This way, I can avoid the additional maintenance, cost and complications which would come with having a separate Connector/Consumer component, as well as avoiding the work to build it in the first place. The complicated part is that I want to process the messages in two ways before doing any inserting:

  1. Ignore messages whose values don't meet a condition (colour = green, for example)

  2. Of those who do meet this condition, only insert certain fields from the message into the collection

The transformation part I think I can do using Post Processors. However, I haven't seen an example of a way to conditionally ignore certain messages entirely. I would have thought this would be something fairly trivial for the Sink Connector to do compared to transforming data. Am I missing something?

Upvotes: 3

Views: 620

Answers (1)

SerhiiH
SerhiiH

Reputation: 629

Here is what you can do:

  1. Use redhat debezium filter smt to accept only messages satisfying a condition.
    transforms=filter
    transforms.filter.type=io.debezium.transforms.Filter
    transforms.filter.language=jsr223.groovy
    transforms.filter.condition=value.colour == 'green'
    
  2. Configure sink-connector postprocessor in order to load into collection only required fields.
    post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
    value.projection.type=AllowList
    value.projection.list=colour,size,etc.
    

This is an example how you can utilize third party debezium filter smt to filter out messages and native MongoDB Kafka sink connector post processors to transform output document.

Upvotes: 0

Related Questions