Sindhu
Sindhu

Reputation: 23

Debezium SQL DB connector on AKS - InconsistentGroupProtocolException

I am getting the InconsistentGroupProtocolException infrequently when running Debezium SQL DB connector on an Azure AKS pod.

*org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.*

The Debezium connector (for SQl DB) runs fine for a while, sometimes a day or more and then fails with this error.

There is a similar question posted here, and trying to understand the resolution mentioned better.

org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing

I am using a group ID in the docker file that installs Debezium. And there is a consumer group ID (schema.history.internal.consumer.group.id) in the SQL DB connector registration. The same group ID is being used in both places. So the group ID in the docker file is the one used by Kafka connect group that publishes messages from Debezium?

Has anyone faced a similar issue? And what can be done to resolve?

Upvotes: 0

Views: 250

Answers (2)

Perttu T
Perttu T

Reputation: 601

I spent quite many hours to find out what's causing this in none AKS environment. The problem is that you're using the same id in the both schema.history.internal.consumer.group.id and in Kafka Connect CONNECT_GROUP_ID. If you change them to be different, this problem disappears. It might be related somehow to the fact that producer and consumer cannot have the same ids. Make sure the ids are different.

Upvotes: 1

Arko
Arko

Reputation: 3771

To address the InconsistentGroupProtocolException, the following points should be considered

Ensure that the group ID specified in the Debezium connector configuration is unique and not shared with other applications or connectors that might cause conflicts. Use the same version of the Debezium connector across your deployment to prevent inconsistencies.

Debezium can be deployed on AKS as below.

First create a namespace

kubectl create ns debezium-example

for the Debezium deployment here I will use Strimzi, which manages the Kafka deployment on Kubernetes. Please see the Strimzi deployment documentation for more details on how to deploy Strimzi on your Kubernetes cluster. you can install it by running the following command:

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

enter image description here

Now, install Strimzi operator itself

enter image description here

when deploying Debezium Kafka connector, we will need to provide the username and password for the connector to be able to connect to the database. Kubernetes provides the Secret object for this purpose. Besides creating the Secret object itself, we have to also create a role and a role binding so that Kafka can access the credentials.

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
EOF

enter image description here

Now, we can create a role, which refers the secret created in the previous step

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF

enter image description here

We also have to bind this role to the Kafka Connect cluster service account so that Kafka Connect can access the secret

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

enter image description here

Next, deploy your Kafka cluster

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

enter image description here

and wait until it's ready

kubectl wait kafka/debezium-cluster --for=condition=Ready --timeout=300s -n debezium-example

enter image description here

Here I will be using MySQL and create the service and deployment accordingly.

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:2.4
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
EOF

To deploy a Debezium connector, you need to deploy a Kafka Connect cluster with the required connector plugins, before instantiating the actual connector itself. As the first step, a container image for Kafka Connect with the plugin has to be created. If you already have a container image built and available in the registry, you can skip this step.

here, I will use Strimzi for creating the Kafka Connect cluster.

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: arkoacr.azurecr.io/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

Note, you need to replace the name of the registry with your registry where you have pushed the image.

If you already have a suitable container image either in the local or a remote registry (such as acr), you can use this simplified version.

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: arkoacr.azurecr.io/debezium-connect-mysql:latest
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
EOF

enter image description here

To create a Debezium connector, you just need to create a KafkaConnector with the appropriate configuration, MySQL in this case.

cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
EOF

enter image description here

Reference- Debezium on k8s

Upvotes: 0

Related Questions