Reputation: 23
I am getting the InconsistentGroupProtocolException infrequently when running Debezium SQL DB connector on an Azure AKS pod.
*org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.*
The Debezium connector (for SQl DB) runs fine for a while, sometimes a day or more and then fails with this error.
There is a similar question posted here, and trying to understand the resolution mentioned better.
I am using a group ID in the docker file that installs Debezium. And there is a consumer group ID (schema.history.internal.consumer.group.id) in the SQL DB connector registration. The same group ID is being used in both places. So the group ID in the docker file is the one used by Kafka connect group that publishes messages from Debezium?
Has anyone faced a similar issue? And what can be done to resolve?
Upvotes: 0
Views: 250
Reputation: 601
I spent quite many hours to find out what's causing this in none AKS environment. The problem is that you're using the same id in the both schema.history.internal.consumer.group.id and in Kafka Connect CONNECT_GROUP_ID. If you change them to be different, this problem disappears. It might be related somehow to the fact that producer and consumer cannot have the same ids. Make sure the ids are different.
Upvotes: 1
Reputation: 3771
To address the InconsistentGroupProtocolException
, the following points should be considered
Ensure that the group ID specified in the Debezium connector configuration is unique and not shared with other applications or connectors that might cause conflicts. Use the same version of the Debezium connector across your deployment to prevent inconsistencies.
Debezium can be deployed on AKS as below.
First create a namespace
kubectl create ns debezium-example
for the Debezium deployment here I will use Strimzi, which manages the Kafka deployment on Kubernetes. Please see the Strimzi deployment documentation for more details on how to deploy Strimzi on your Kubernetes cluster. you can install it by running the following command:
curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0
Now, install Strimzi operator itself
when deploying Debezium Kafka connector, we will need to provide the username and password for the connector to be able to connect to the database. Kubernetes provides the Secret
object for this purpose. Besides creating the Secret
object itself, we have to also create a role and a role binding so that Kafka can access the credentials.
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: debezium-example
type: Opaque
data:
username: ZGViZXppdW0=
password: ZGJ6
EOF
Now, we can create a role, which refers the secret created in the previous step
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: debezium-example
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
EOF
We also have to bind this role to the Kafka Connect cluster service account so that Kafka Connect can access the secret
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: debezium-example
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: debezium-example
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
EOF
Next, deploy your Kafka cluster
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: debezium-cluster
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
EOF
and wait until it's ready
kubectl wait kafka/debezium-cluster --for=condition=Ready --timeout=300s -n debezium-example
Here I will be using MySQL and create the service and deployment accordingly.
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: quay.io/debezium/example-mysql:2.4
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: debezium
- name: MYSQL_USER
value: mysqluser
- name: MYSQL_PASSWORD
value: mysqlpw
ports:
- containerPort: 3306
name: mysql
EOF
To deploy a Debezium connector, you need to deploy a Kafka Connect cluster with the required connector plugins, before instantiating the actual connector itself. As the first step, a container image for Kafka Connect with the plugin has to be created. If you already have a container image built and available in the registry, you can skip this step.
here, I will use Strimzi for creating the Kafka Connect cluster.
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: arkoacr.azurecr.io/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF
Note, you need to replace the name of the registry with your registry where you have pushed the image.
If you already have a suitable container image either in the local or a remote registry (such as acr), you can use this simplified version.
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: arkoacr.azurecr.io/debezium-connect-mysql:latest
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
EOF
To create a Debezium connector, you just need to create a KafkaConnector
with the appropriate configuration, MySQL in this case.
cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: ${secrets:debezium-example/debezium-secret:username}
database.password: ${secrets:debezium-example/debezium-secret:password}
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory
EOF
Reference- Debezium on k8s
Upvotes: 0