Reputation: 1
I have two microservices (A and B). Microservice A is producing events for Kafka. I checked the data on my Kafka instance and I can verify that they are stored there. However, my consumer B is not reacting to these events.
docker-compose.yml
:
zookeeper:
container_name: zookeeper-service
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- app-tier
kafka:
container_name: kafka-service
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
- '29092:29092'
volumes:
- ./kafka-persistence:/bitnami/kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
depends_on:
- zookeeper
networks:
- app-tier
Microservice A (Producer)
application.yml:
axon:
serializer:
general: jackson
messages: jackson
events: jackson
kafka:
bootstrap-servers: kafka:9092
client-id: myproducer
default-topic: local.event
properties:
security.protocol: PLAINTEXT
producer:
transaction-id-prefix: kafka-sample
retries: 5
Sender: As the name suggests, this class is sending events to the Kafka bus.
@Component
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Autowired
private EventBus eventBus;
public <T> void send(T event) {
LOGGER.info("publishing event {}", event);
EventMessage<T> eventMessage = GenericEventMessage.asEventMessage(event);
eventBus.publish(eventMessage);
}
}
Microservice B (Consumer)
application.yml:
axon:
serializer:
general: jackson
messages: jackson
events: jackson
eventhandling:
processors:
MyProcessor:
source: streamableKafkaMessageSource
mode: TRACKING
threadCount: 1
batchSize: 1
kafka:
bootstrap-servers: kafka:9092
client-id: myconsumer
default-topic: local.event
properties:
security.protocol: PLAINTEXT
consumer:
event-processor-mode: tracking
KafkaEventConsumer:
@Component
@ProcessingGroup("MyProcessor")
public class KafkaEventConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventConsumer.class);
@EventHandler
public void handleMyEvent(ObjektfotoDesProjektsFestgelegt myEvent){
LOGGER.info("got the event {}", myEvent);
System.out.println("_____ EVENT CONSUMED ______");
}
}
The event ObjektfotoDesProjektsFestgelegt
is present in both microservices. I couldnt find any information on how the microservices will understand that they are talking about the same event. I am guessing that they will just look on the name. So this event is a duplicate and therefore present in both microservice A and B:
@Getter
@RequiredArgsConstructor
public class ObjektfotoDesProjektsFestgelegt {
private final UUID projektId;
private final String objektfoto;
}
While I can verify that Kafka received the event, I also have the following logs inside microservice A (producer) that show me some relevant activity:
2022-10-20 18:16:43.522 INFO 1 --- [nio-8083-exec-3] o.a.k.c.p.internals.TransactionManager : [Producer clientId=myproducer, transactionalId=kafka-sample1] Invoking InitProducerId for the first time in order to acquire a producer ID
2022-10-20 18:16:43.531 INFO 1 --- [ad | myproducer] o.a.k.c.p.internals.TransactionManager : [Producer clientId=myproducer, transactionalId=kafka-sample1] Discovered transaction coordinator kafka:9092 (id: 1 rack: null)
2022-10-20 18:16:43.638 INFO 1 --- [ad | myproducer] o.a.k.c.p.internals.TransactionManager : [Producer clientId=myproducer, transactionalId=kafka-sample1] ProducerId set to 1 with epoch 0
2022-10-20 19:13:49.050 INFO 1 --- [nio-8083-exec-4] d.g.datei.domain.model.datei.Sender : publishing event de.****.*****.domain.model.datei.events.ObjektfotoDesProjektsFestgelegt@3b8cd75a
On the other side, my consumer is not reacting at all:
2022-10-20 18:16:44.115 INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2022-10-20 18:16:44.115 INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2022-10-20 18:16:44.115 INFO 1 --- [[MyProcessor]-0] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1666289804115
2022-10-20 18:16:44.121 INFO 1 --- [[MyProcessor]-0] org.apache.kafka.clients.Metadata : [Consumer clientId=myconsumer, groupId=null] Cluster ID: QGf9v-SrTlyAV-Ug9yOp_w
2022-10-20 18:16:44.125 INFO 1 --- [[MyProcessor]-0] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=myconsumer, groupId=null] Subscribed to partition(s): local.event-0
2022-10-20 18:16:44.132 INFO 1 --- [[MyProcessor]-0] o.a.e.k.e.consumer.ConsumerSeekUtil : Seeking topic-partition [local.event-0] with offset [0]
2022-10-20 18:16:44.132 INFO 1 --- [[MyProcessor]-0] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=myconsumer, groupId=null] Seeking to offset 0 for partition local.event-0
Furthermore, both microservices have the following dependencies available:
<properties>
<java.version>1.8</java.version>
<axon.version>4.6.0</axon.version>
<version.mapstruct>1.4.2.Final</version.mapstruct>
<lombok.version>1.18.16</lombok.version>
<version.mapstruct-lombok>0.2.0</version.mapstruct-lombok>
</properties>
....
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Why is it that my consumer is not reacting at all?
Upvotes: 0
Views: 184
Reputation: 401
You might still need to register the processor, like here. To be able to use events in multiple microservices, it's best to create a separate module, like here. I hope this was helpful.
Upvotes: 1