Matthias Meerhof
Matthias Meerhof

Reputation: 41

Kafka Stream job not processing / consuming

Notes after review

I'm using a dockerized version of Kafka and the Kafka Streams job. These are spun-up with Docker Compose consecutively.

Problem description:

For my thesis I'm working with Kafka Streams. Everything works fine but it takes a while before the stream job actually starts processing. It took me a while before I figured that out why. Apparently its not processing until it goes from RUNNING to REBALACING back to RUNNING state . Does anyone know why this is and what I could do to start processing immediately? Maybe some configuration I'm missing.

I'm using Confluent's KAFKA REST API to submit test messages to the input topics.

The log just before it starts the actual processing looks like this:

INFO org.apache.kafka.clients.Metadata - Cluster ID: aKDudbTgTSq9gY-M6eHqyw

INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] Revoking previously assigned partitions []

INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED

INFO org.apache.kafka.streams.KafkaStreams - stream-client [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188] State transition from RUNNING to REBALANCING

INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1] partition revocation took 0 ms.
   suspended active tasks: []
   suspended standby tasks: []

INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] (Re-)joining group

INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer] Assigned tasks to clients as {823ba6d0-5b02-4a92-ac64-592b6d3e4188=[activeTasks: ([1_0]) standbyTasks: ([]) assignedTasks: ([1_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.

WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] The following subscribed topics are not assigned to any members: [product-brand-withkey]

INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] Successfully joined group with generation 2

INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] Setting newly assigned partitions [product-item-withkey-0]

INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1] partition assignment took 19 ms.
   current active tasks: [1_0]
   current standby tasks: []
   previous active tasks: []

INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1-consumer, groupId=aggregation-item-brand-prototype] Resetting offset for partition product-item-withkey-0 to offset 0.

INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING

INFO org.apache.kafka.streams.KafkaStreams - stream-client [aggregation-item-brand-prototype-823ba6d0-5b02-4a92-ac64-592b6d3e4188] State transition from REBALANCING to RUNNING

Upvotes: 3

Views: 6703

Answers (1)

Matthias Meerhof
Matthias Meerhof

Reputation: 41

I found out the problem was already posted. This is the problem: Why does Kafka consumer takes long time to start consuming?

I'm using a dockerized version of Kafka. Kafka and the Kafka Streams job are spun-up together. The topics, although created upfront by Kafka, are still in the phase of leader election when the Kafka Streams job starts to consume. This results in an inability to fetch any metadata about the topic and consuming messages. Only after a refresh of the metadata, controlled by the metadata.max.age.ms parameter, the consumer actually starts to consume.

I fixed this by putting a sleep of 30 sec in the start shell script of the Kafka Streams job to wait out the election. Now it starts consuming immediately

Upvotes: 1

Related Questions