somnathchakrabarti
somnathchakrabarti

Reputation: 3118

org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer]

My cluster configuration, class details and jar versions are mentioned in the question org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

I have started Zookeeper-server, Kafka-server and Kafka REST server. Next I am deploying my spring-boot war file named spring-kafka-webhook-service.war file on tomcat.

As I am posting messages through Kafka REST proxy client, I am getting the below error, which probably suggests the @KafkaListener method is failing to read ConsumerRecord incoming message. Any inputs will be highly appreciated.

My Kafka-Rest properties is currently configured as below:

confluent-3.3.0/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=localhost:2181

Error Log after war deployment on tomcat

2017-12-26 09:11:01.143 ERROR 20430 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 3, CreateTime = 1514279460946, checksum = 1183108784, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'])

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent,java.lang.String,java.lang.Integer,int,java.lang.String)]
Bean [com.psl.kafka.spring.InventoryEventReceiver@798267fb]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer], failedMessage=GenericMessage [payload=InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'], headers={kafka_offset=3, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:183) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer]
        at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        ... 8 common frames omitted

Upvotes: 3

Views: 12616

Answers (2)

Alferd Nobel
Alferd Nobel

Reputation: 3979

In order to make this work I had to pass a messageId(key) from the Producer and pass it in the send() method. Like so:

public void sendMessage(CallStack callStack) {
        String topicName = "my-topic-name";
        String messageId = UUID.randomUUID().toString();
        callStack.setId(messageId);
        ListenableFuture<SendResult<String, CallStack>> future = kafkaTemplate
          .send(topicName,messageId,callStack);

        //This will check producer result asynchronously to avoid thread blocking
        future.addCallback(new ListenableFutureCallback<SendResult<String, CallStack>>() {
            @Override
            public void onFailure(@NotNull Throwable throwable) {
                log.error("Failed to send message", throwable);
            }

            @Override
            public void onSuccess(SendResult<String, CallStack> stringStringSendResult) {

                log.info(String.format("Produced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d", stringStringSendResult.getRecordMetadata().topic(),
                        stringStringSendResult.getRecordMetadata().offset(),
                        stringStringSendResult.getRecordMetadata().partition(),
                        stringStringSendResult.getRecordMetadata().serializedValueSize()));
            }
        });
    }

Consumer Service:

 @KafkaListener(topics = "perfEvent", containerFactory = "kafkaListenerContainerFactory")
        public void consume(final @Payload CallStack callStack,
                            final @Header(KafkaHeaders.OFFSET) Integer offset,
                            final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                            final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                            final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
            log.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s",
                    ts, callStack, offset, key, partition, topic));
            log.info("Persisting message id: {}",callStack.getId());
            callStackRepository.insert(callStack);
    
    
        }

Upvotes: 0

somnathchakrabarti
somnathchakrabarti

Reputation: 3118

Used the @KafkaListener using a method with only POJO "InventoryEvent" as a param

InventoryEvent event

instead of

@Payload InventoryEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) String offset

This solved the issue as kafka_receivedMessageKey is never sent over Kafka as specified in this SO answer by Artem Bilan https://stackoverflow.com/a/32125453/786676

Upvotes: 3

Related Questions