Danielle Paquette-Harvey
Danielle Paquette-Harvey

Reputation: 1781

Kafka consumer.poll hangs with bitnami container

I have the latest bitnami kafka container installed on a remote server.

[2021-04-07 18:05:38,263] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
[2021-04-07 18:05:40,137] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)

My kafka is configured so that I can have external connections.

kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
ports:
  - '9092:9092'
  - '9093:9093'
environment:
  - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  - ALLOW_PLAINTEXT_LISTENER=yes
  - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
  - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

ping and telnet to the ip address both work.

I am able to run a producer and send data in python.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer

#Producer---------------------------------------------------------------
producer = KafkaProducer(bootstrap_servers=['192.xxx.xx.xx:9093'],
                     value_serializer=lambda x: 
                     dumps(x).encode('utf-8'))

producer.send('TestTopic1', value='MyTest')

But, I am unable to consume the data. The script hangs at consumer.poll and never changes lines.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaConsumer

# Consumer---------------------------------------------------------------
consumer = KafkaConsumer(
    'TestTopic1',
    bootstrap_servers=['192.xxx.xx.xx:9093'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id='testgroup',
    value_deserializer=lambda x : loads(x.decode('utf-8')))

#I've tried both with group_id to None or with a group_id.

print('BEFORE subscribe: ')
consumer.subscribe(['TestTopic1'])

print('BEFORE poll: ')
# HANGS HERE!! Never gets to the print after
consumer.poll(timeout_ms=500)

print('AFTER POLL: ')
consumer.seek_to_beginning()

print('partitions of the topic: ', consumer.partitions_for_topic('TestTopic1'))

for msg in consumer:
    print(type(msg))

In the Kafka logs, I see the Topic getting created as well as other lines that I'm not quite sure what they mean.

[2021-04-07 18:05:40,234] INFO [broker-1001-to-controller-send-thread]: Recorded new controller, from now on will use broker 1001 (kafka.server.BrokerToControllerRequestThread)
[2021-04-07 18:06:37,509] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment Map(23 -> ArrayBuffer(1001), 32 -> ArrayBuffer(1001), 41 -> ArrayBuffer(1001), 17 -> ArrayBuffer(1001), 8 -> ArrayBuffer(1001), 35 -> ArrayBuffer(1001), 44 -> ArrayBuffer(1001), 26 -> ArrayBuffer(1001), 11 -> ArrayBuffer(1001), 29 -> ArrayBuffer(1001), 38 -> ArrayBuffer(1001), 47 -> ArrayBuffer(1001), 20 -> ArrayBuffer(1001), 2 -> ArrayBuffer(1001), 5 -> ArrayBuffer(1001), 14 -> ArrayBuffer(1001), 46 -> ArrayBuffer(1001), 49 -> ArrayBuffer(1001), 40 -> ArrayBuffer(1001), 13 -> ArrayBuffer(1001), 4 -> ArrayBuffer(1001), 22 -> ArrayBuffer(1001), 31 -> ArrayBuffer(1001), 16 -> ArrayBuffer(1001), 7 -> ArrayBuffer(1001), 43 -> ArrayBuffer(1001), 25 -> ArrayBuffer(1001), 34 -> ArrayBuffer(1001), 10 -> ArrayBuffer(1001), 37 -> ArrayBuffer(1001), 1 -> ArrayBuffer(1001), 19 -> ArrayBuffer(1001), 28 -> ArrayBuffer(1001), 45 -> ArrayBuffer(1001), 27 -> ArrayBuffer(1001), 36 -> ArrayBuffer(1001), 18 -> ArrayBuffer(1001), 9 -> ArrayBuffer(1001), 21 -> ArrayBuffer(1001), 48 -> ArrayBuffer(1001), 3 -> ArrayBuffer(1001), 12 -> ArrayBuffer(1001), 30 -> ArrayBuffer(1001), 39 -> ArrayBuffer(1001), 15 -> ArrayBuffer(1001), 42 -> ArrayBuffer(1001), 24 -> ArrayBuffer(1001), 6 -> ArrayBuffer(1001), 33 -> ArrayBuffer(1001), 0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,534] INFO [KafkaApi-1001] Auto creation of topic __consumer_offsets with 50 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,547] INFO Creating topic TestTopic1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,557] INFO [KafkaApi-1001] Auto creation of topic TestTopic1 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,906] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:37,979] INFO [Log partition=__consumer_offsets-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:37,991] INFO Created log for partition __consumer_offsets-0 in /bitnami/kafka/data/__consumer_offsets-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:37,992] INFO [Partition __consumer_offsets-0 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-0 (kafka.cluster.Partition)
[2021-04-07 18:06:37,994] INFO [Partition __consumer_offsets-0 broker=1001] Log loaded for partition __consumer_offsets-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,011] INFO [Log partition=__consumer_offsets-29, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

Then I have another series of this type of messages in my log.

[2021-04-07 18:06:38,563] INFO Created log for partition __consumer_offsets-13 in /bitnami/kafka/data/__consumer_offsets-13 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] Log loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,577] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-22 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-25 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-28 (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,589] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-22 in 12 milliseconds, of which 2 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,596] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-25 in 17 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,597] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-28 in 18 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,638] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(TestTopic1-0) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:38,643] INFO [Log partition=TestTopic1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:38,644] INFO Created log for partition TestTopic1-0 in /bitnami/kafka/data/TestTopic1-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] No checkpointed highwatermark is found for partition TestTopic1-0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] Log loaded for partition TestTopic1-0 with initial high watermark 0 (kafka.cluster.Partition)

I don't see anything related to the consumer in here.

Note that this is only a dev server. We're supposed to use that as a proof of concept to see if Kafka works for us and see if we'll use it in prod.

Any help would be appreciated as we'd really like to be able to make it work and use it in production.

Upvotes: 0

Views: 1073

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191710

installed on a remote server.

Then you need to advertise that server's address in KAFKA_CFG_ADVERTISED_LISTENERS, just a port mapping isn't sufficient

It's timing out because the bootstrap protocol returns the advertised address, so your remote consumer is trying to read from localhost:9093

Your producer would also have a similar issue, but you aren't flushing the producer to actually send data


If you productionize using Docker orchestration platforms, you'll need to work around other networking configurations

Upvotes: 1

Related Questions