Gregory
Gregory

Reputation: 1

Axon: Consumer is not handling Kafka events

I have two microservices (A and B). Microservice A is producing events for Kafka. I checked the data on my Kafka instance and I can verify that they are stored there. However, my consumer B is not reacting to these events.

docker-compose.yml:

zookeeper:
    container_name: zookeeper-service
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - app-tier

kafka:
    container_name: kafka-service
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - ./kafka-persistence:/bitnami/kafka
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    depends_on:
      - zookeeper
    networks:
      - app-tier

Microservice A (Producer)

application.yml:

axon:
  serializer:
    general: jackson
    messages: jackson
    events: jackson
  kafka:
    bootstrap-servers: kafka:9092
    client-id: myproducer
    default-topic: local.event
    properties:
      security.protocol: PLAINTEXT
    producer:
      transaction-id-prefix: kafka-sample
      retries: 5

Sender: As the name suggests, this class is sending events to the Kafka bus.

@Component
public class Sender {

    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private EventBus eventBus;

    public <T> void send(T event) {
        LOGGER.info("publishing event {}", event);
        EventMessage<T> eventMessage = GenericEventMessage.asEventMessage(event);
        eventBus.publish(eventMessage);
    }
}

Microservice B (Consumer)

application.yml:

axon:
  serializer:
    general: jackson
    messages: jackson
    events: jackson
  eventhandling:
    processors:
      MyProcessor:
        source: streamableKafkaMessageSource
        mode: TRACKING
        threadCount: 1
        batchSize: 1
  kafka:
    bootstrap-servers: kafka:9092
    client-id: myconsumer
    default-topic: local.event
    properties:
      security.protocol: PLAINTEXT
    consumer:
      event-processor-mode: tracking

KafkaEventConsumer:

@Component
@ProcessingGroup("MyProcessor")
public class KafkaEventConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventConsumer.class);

    @EventHandler
    public void handleMyEvent(ObjektfotoDesProjektsFestgelegt myEvent){
        LOGGER.info("got the event {}", myEvent);
        System.out.println("_____ EVENT CONSUMED ______");
    }

}

The event ObjektfotoDesProjektsFestgelegt is present in both microservices. I couldnt find any information on how the microservices will understand that they are talking about the same event. I am guessing that they will just look on the name. So this event is a duplicate and therefore present in both microservice A and B:

@Getter
@RequiredArgsConstructor
public class ObjektfotoDesProjektsFestgelegt {
    private final UUID projektId;
    private final String objektfoto;
}

While I can verify that Kafka received the event, I also have the following logs inside microservice A (producer) that show me some relevant activity:

2022-10-20 18:16:43.522  INFO 1 --- [nio-8083-exec-3] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=myproducer, transactionalId=kafka-sample1] Invoking InitProducerId for the first time in order to acquire a producer ID

2022-10-20 18:16:43.531  INFO 1 --- [ad | myproducer] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=myproducer, transactionalId=kafka-sample1] Discovered transaction coordinator kafka:9092 (id: 1 rack: null)

2022-10-20 18:16:43.638  INFO 1 --- [ad | myproducer] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=myproducer, transactionalId=kafka-sample1] ProducerId set to 1 with epoch 0

2022-10-20 19:13:49.050  INFO 1 --- [nio-8083-exec-4] d.g.datei.domain.model.datei.Sender      : publishing event de.****.*****.domain.model.datei.events.ObjektfotoDesProjektsFestgelegt@3b8cd75a

On the other side, my consumer is not reacting at all:

2022-10-20 18:16:44.115  INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1

2022-10-20 18:16:44.115  INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457

2022-10-20 18:16:44.115  INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1666289804115

2022-10-20 18:16:44.121  INFO 1 --- [[MyProcessor]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=myconsumer, groupId=null] Cluster ID: QGf9v-SrTlyAV-Ug9yOp_w

2022-10-20 18:16:44.125  INFO 1 --- [[MyProcessor]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=myconsumer, groupId=null] Subscribed to partition(s): local.event-0

2022-10-20 18:16:44.132  INFO 1 --- [[MyProcessor]-0] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [local.event-0] with offset [0]

2022-10-20 18:16:44.132  INFO 1 --- [[MyProcessor]-0] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=myconsumer, groupId=null] Seeking to offset 0 for partition local.event-0

Furthermore, both microservices have the following dependencies available:

    <properties>
        <java.version>1.8</java.version>
        <axon.version>4.6.0</axon.version>
        <version.mapstruct>1.4.2.Final</version.mapstruct>
        <lombok.version>1.18.16</lombok.version>
        <version.mapstruct-lombok>0.2.0</version.mapstruct-lombok>
    </properties>
....
<dependency>
    <groupId>org.axonframework.extensions.kafka</groupId>
    <artifactId>axon-kafka-spring-boot-starter</artifactId>
    <version>${axon.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Why is it that my consumer is not reacting at all?

Upvotes: 0

Views: 184

Answers (1)

Gerard Klijs
Gerard Klijs

Reputation: 401

You might still need to register the processor, like here. To be able to use events in multiple microservices, it's best to create a separate module, like here. I hope this was helpful.

Upvotes: 1

Related Questions