Reputation: 2550
I am trying to capture data from Kafka using MongoDB debezium connector but I am getting error when I try to read it with KafkaJS:
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
I am using docker images to capture data.
Here are the steps, I am following :
Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
start kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
I have MongoDB running with replicate mode already
Start debezium Kafka connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka debezium/connect:latest
Then Post MongoDB connector configuration
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
With this If I run a watcher docker container, I am able to data in Json format in console
docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
but I want to capture this data in application so that I can manipulate it, process it and push to ElasticSearch. For that I am using
https://github.com/tulios/kafkajs
But When I run the consumer code, I am getting error.. Here is code example
//'use strict';
// clientId=connect-1, groupId=1
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'connect-1',
brokers: ['localhost:9092', 'localhost:9093']
})
// Consuming
const consumer = kafka.consumer({ groupId: '1' })
var consumeMessage = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
consumeMessage();
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
Upvotes: 1
Views: 5493
Reputation: 2562
You should not be using the same groupId in both Connect and your KafkaJS consumer. If you do, they will be part of the same consumer group, which means that messages would only be consumed by one or the other, if it even worked at all.
If you change the groupId of your KafkaJS consumer to something unique, it should work.
Note that by default a new KafkaJS consumer group will start consuming from the latest offset, so it won't consume already produced messages. You can override this behavior with the fromBeginning
flag in the consumer.subscribe
call. See https://kafka.js.org/docs/consuming#from-beginning
Upvotes: 4