Abdul Azizi
Abdul Azizi

Reputation: 11

Cannot authenticate to my newly created strimzi kafka cluster

I have a brand new strimzi cluster setup in AWS EKS that I am trying to test authentication via a python script to access the newly created topic using the newly created kafka user.

Below are my manifests, and the python script. Please advise.

Kafka Node pools and Cluster

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: kraft-controller
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        kraftMetadata: shared
        deleteClaim: false
        class: test-platform-team-sc
---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: kafka-broker
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        kraftMetadata: shared
        deleteClaim: false
        class: test-platform-team-sc

---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.8.0
    metadataVersion: 3.8-IV0
    authorization:
        type: simple
    listeners:
      - name: plain
        port: 9092
        type: loadbalancer
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    # ...
    topicOperator:
      watchedNamespace: strimzi-kafka
      reconciliationIntervalMs: 60000
      resources:
        requests:
          cpu: "1"
          memory: 500Mi
        limits:
          cpu: "1"
          memory: 500Mi
    # ...
    userOperator:
      watchedNamespace: strimzi-kafka
      reconciliationIntervalMs: 60000
      resources:
        requests:
          cpu: "1"
          memory: 500Mi
        limits:
          cpu: "1"
          memory: 500Mi

---

Kafka Topics

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: flink-input
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10
  replicas: 2

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: flink-output
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10
  replicas: 2

Kafka User

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: flink
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      # Example consumer Acls for topic my-topic using consumer group my-group
      - resource:
          type: topic
          name: flink-input
          patternType: literal
        operations:
          - Create
          - Describe
          - Write
          - Read
        host: "*"
      - resource:
          type: group
          name: my-group
          patternType: literal
        operations:
          - Read
        host: "*"
      # Example Producer Acls for topic my-topic
      - resource:
          type: topic
          name: flink-output
          patternType: literal
        operations:
          - Create
          - Describe
          - Write
          - Read
        host: "*"

Python Script

import logging
from kafka import KafkaConsumer
from kafka.errors import KafkaError

# Enable logging for kafka-python
logging.basicConfig(level=logging.DEBUG)

# Configuration
BROKER = "AWSELBFQDN:9092"
TOPIC = "flink-input"
USERNAME = "flink"
PASSWORD = "DecodedSecretPassword"

try:
    consumer = KafkaConsumer(
        TOPIC,
        bootstrap_servers=[BROKER],
        security_protocol="SASL_PLAINTEXT",
        sasl_mechanism="PLAIN",
        sasl_plain_username=USERNAME,
        sasl_plain_password=PASSWORD,
        group_id="my-group",
        auto_offset_reset="earliest",
        consumer_timeout_ms=10000
    )

    print("Successfully connected to Kafka broker.")
    print(f"Listening to topic: {TOPIC}")
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")
        break
    consumer.close()

except KafkaError as e:
    print(f"Failed to connect to Kafka broker: {e}")

This is the actual exception, i tried using bootstrap server LB FQDN and each of the 3 brokers LB FQDNs and no luck :(

Failed to connect to Kafka broker: NoBrokersAvailable

pods are healthy and services and user are available

kubectl get pods -n strimzi-kafka
NAME                                          READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-6f4cc9b654-frzsw   2/2     Running   0          53m
my-cluster-kafka-broker-0                     1/1     Running   0          16m
my-cluster-kafka-broker-1                     1/1     Running   0          15m
my-cluster-kafka-broker-2                     1/1     Running   0          16m
my-cluster-kraft-controller-3                 1/1     Running   0          60m
my-cluster-kraft-controller-4                 1/1     Running   0          61m
my-cluster-kraft-controller-5                 1/1     Running   0          60m
strimzi-cluster-operator-6f9fbb4c75-twx8p     1/1     Running   0          75m

kubectl get services -n strimzi-kafka
NAME                               TYPE           CLUSTER-IP       EXTERNAL-IP                                                                   PORT(S)                               AGE
my-cluster-kafka-bootstrap         ClusterIP      10.100.234.142   <none>                                                                        9091/TCP,9093/TCP                     15d
my-cluster-kafka-broker-plain-0    LoadBalancer   10.100.193.37    awslbfqdn9092:31351/TCP                        17m
my-cluster-kafka-broker-plain-1    LoadBalancer   10.100.234.209   awslbfqdn9092:31518/TCP                        17m
my-cluster-kafka-broker-plain-2    LoadBalancer   10.100.233.6     awslbfqdn9092:31231/TCP                        17m
my-cluster-kafka-brokers           ClusterIP      None             <none>                                                                        9090/TCP,9091/TCP,8443/TCP,9093/TCP   15d
my-cluster-kafka-plain-bootstrap   LoadBalancer   10.100.13.61     awslbfqdn9092:31631/TCP                        17m

kubectl get kafkausers -n strimzi-kafka
NAME    CLUSTER      AUTHENTICATION   AUTHORIZATION   READY
flink   my-cluster   scram-sha-512    simple          True

Upvotes: 1

Views: 22

Answers (0)

Related Questions