Reputation: 11
I have a brand new strimzi cluster setup in AWS EKS that I am trying to test authentication via a python script to access the newly created topic using the newly created kafka user.
Below are my manifests, and the python script. Please advise.
Kafka Node pools and Cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: kraft-controller
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- controller
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
kraftMetadata: shared
deleteClaim: false
class: test-platform-team-sc
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: kafka-broker
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
kraftMetadata: shared
deleteClaim: false
class: test-platform-team-sc
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.8.0
metadataVersion: 3.8-IV0
authorization:
type: simple
listeners:
- name: plain
port: 9092
type: loadbalancer
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
entityOperator:
# ...
topicOperator:
watchedNamespace: strimzi-kafka
reconciliationIntervalMs: 60000
resources:
requests:
cpu: "1"
memory: 500Mi
limits:
cpu: "1"
memory: 500Mi
# ...
userOperator:
watchedNamespace: strimzi-kafka
reconciliationIntervalMs: 60000
resources:
requests:
cpu: "1"
memory: 500Mi
limits:
cpu: "1"
memory: 500Mi
---
Kafka Topics
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: flink-input
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 10
replicas: 2
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: flink-output
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 10
replicas: 2
Kafka User
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: flink
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Example consumer Acls for topic my-topic using consumer group my-group
- resource:
type: topic
name: flink-input
patternType: literal
operations:
- Create
- Describe
- Write
- Read
host: "*"
- resource:
type: group
name: my-group
patternType: literal
operations:
- Read
host: "*"
# Example Producer Acls for topic my-topic
- resource:
type: topic
name: flink-output
patternType: literal
operations:
- Create
- Describe
- Write
- Read
host: "*"
Python Script
import logging
from kafka import KafkaConsumer
from kafka.errors import KafkaError
# Enable logging for kafka-python
logging.basicConfig(level=logging.DEBUG)
# Configuration
BROKER = "AWSELBFQDN:9092"
TOPIC = "flink-input"
USERNAME = "flink"
PASSWORD = "DecodedSecretPassword"
try:
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[BROKER],
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username=USERNAME,
sasl_plain_password=PASSWORD,
group_id="my-group",
auto_offset_reset="earliest",
consumer_timeout_ms=10000
)
print("Successfully connected to Kafka broker.")
print(f"Listening to topic: {TOPIC}")
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
break
consumer.close()
except KafkaError as e:
print(f"Failed to connect to Kafka broker: {e}")
This is the actual exception, i tried using bootstrap server LB FQDN and each of the 3 brokers LB FQDNs and no luck :(
Failed to connect to Kafka broker: NoBrokersAvailable
pods are healthy and services and user are available
kubectl get pods -n strimzi-kafka
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-6f4cc9b654-frzsw 2/2 Running 0 53m
my-cluster-kafka-broker-0 1/1 Running 0 16m
my-cluster-kafka-broker-1 1/1 Running 0 15m
my-cluster-kafka-broker-2 1/1 Running 0 16m
my-cluster-kraft-controller-3 1/1 Running 0 60m
my-cluster-kraft-controller-4 1/1 Running 0 61m
my-cluster-kraft-controller-5 1/1 Running 0 60m
strimzi-cluster-operator-6f9fbb4c75-twx8p 1/1 Running 0 75m
kubectl get services -n strimzi-kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.100.234.142 <none> 9091/TCP,9093/TCP 15d
my-cluster-kafka-broker-plain-0 LoadBalancer 10.100.193.37 awslbfqdn9092:31351/TCP 17m
my-cluster-kafka-broker-plain-1 LoadBalancer 10.100.234.209 awslbfqdn9092:31518/TCP 17m
my-cluster-kafka-broker-plain-2 LoadBalancer 10.100.233.6 awslbfqdn9092:31231/TCP 17m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9093/TCP 15d
my-cluster-kafka-plain-bootstrap LoadBalancer 10.100.13.61 awslbfqdn9092:31631/TCP 17m
kubectl get kafkausers -n strimzi-kafka
NAME CLUSTER AUTHENTICATION AUTHORIZATION READY
flink my-cluster scram-sha-512 simple True
Upvotes: 1
Views: 22