Reputation: 3118
My cluster configuration, class details and jar versions are mentioned in the question org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
I have started Zookeeper-server, Kafka-server and Kafka REST server. Next I am deploying my spring-boot war file named spring-kafka-webhook-service.war
file on tomcat.
As I am posting messages through Kafka REST proxy client, I am getting the below error, which probably suggests the @KafkaListener
method is failing to read ConsumerRecord
incoming message. Any inputs will be highly appreciated.
My Kafka-Rest properties is currently configured as below:
confluent-3.3.0/etc/kafka-rest/kafka-rest.properties
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=localhost:2181
Error Log after war deployment on tomcat
2017-12-26 09:11:01.143 ERROR 20430 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 3, CreateTime = 1514279460946, checksum = 1183108784, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'])
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent,java.lang.String,java.lang.Integer,int,java.lang.String)]
Bean [com.psl.kafka.spring.InventoryEventReceiver@798267fb]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer], failedMessage=GenericMessage [payload=InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'], headers={kafka_offset=3, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:183) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer]
at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na]
... 8 common frames omitted
Upvotes: 3
Views: 12616
Reputation: 3979
In order to make this work I had to pass a messageId(key) from the Producer and pass it in the send() method. Like so:
public void sendMessage(CallStack callStack) {
String topicName = "my-topic-name";
String messageId = UUID.randomUUID().toString();
callStack.setId(messageId);
ListenableFuture<SendResult<String, CallStack>> future = kafkaTemplate
.send(topicName,messageId,callStack);
//This will check producer result asynchronously to avoid thread blocking
future.addCallback(new ListenableFutureCallback<SendResult<String, CallStack>>() {
@Override
public void onFailure(@NotNull Throwable throwable) {
log.error("Failed to send message", throwable);
}
@Override
public void onSuccess(SendResult<String, CallStack> stringStringSendResult) {
log.info(String.format("Produced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d", stringStringSendResult.getRecordMetadata().topic(),
stringStringSendResult.getRecordMetadata().offset(),
stringStringSendResult.getRecordMetadata().partition(),
stringStringSendResult.getRecordMetadata().serializedValueSize()));
}
});
}
Consumer Service:
@KafkaListener(topics = "perfEvent", containerFactory = "kafkaListenerContainerFactory")
public void consume(final @Payload CallStack callStack,
final @Header(KafkaHeaders.OFFSET) Integer offset,
final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
log.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s",
ts, callStack, offset, key, partition, topic));
log.info("Persisting message id: {}",callStack.getId());
callStackRepository.insert(callStack);
}
Upvotes: 0
Reputation: 3118
Used the @KafkaListener using a method with only POJO "InventoryEvent" as a param
InventoryEvent event
instead of
@Payload InventoryEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) String offset
This solved the issue as kafka_receivedMessageKey is never sent over Kafka as specified in this SO answer by Artem Bilan https://stackoverflow.com/a/32125453/786676
Upvotes: 3