rajjav123
rajjav123

Reputation: 13

Springboot Kafka Producer Error Handling with Cloud Stream Binder

I created a Springboot application to push message to a Kafka Topic. The application is working fine. What I am trying is to handle the exceptions when there is a failure while sending the messages to Kafka Topic. I am using the Error Channel to track the errors while sending a message. But the actual issue is, I am able to see the error message, but I am not able to see the Actual payload which got failed in the error message. Actually , I want to log that Payload.

The JSON Message that I am trying to send : {"key1":"value1"}

Service class :

@AllArgsConstructor
@EnableBinding(Source.class)
public class SendMessageToKafka {

    private final Source source;

    public void sendMessage(String sampleMessage) {
        source.output.send(MessageBuilder.withPayLoad(sampleMessage).build());
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void errorHandler(ErrorMessage em) {
        System.out.println(em);
    }
}

application.yml:

spring:
 cloud:
  stream:
   bindings:
    output:
     producer:
      error-channel-enabled: true

With the above configuration, when the Kafka server is down, the control is coming to the errorHandler method and printing the message. But I am not able to see the actual payload which is {"key1":"value1"} from the error message. How can I retrieve that from the error message?

Upvotes: 1

Views: 1506

Answers (1)

awgtek
awgtek

Reputation: 1839

You can filter the payload based on type, e.g. KafkaSendFailureException (https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_spring_integration.html indicates the ErrorMessage payload is this type.) After that what worked in my case is to cast it down to the original sent message as follows (analyze the objects via breakpoint to determine the appropriate value type, e.g. byte[]):

@ServiceActivator(inputChannel = "errorChannel")
public void errorHandler(ErrorMessage em) {
    log.debug("got error message over errorChannel: {}", em);
    if (null != em.getPayload() && em.getPayload() instanceof KafkaSendFailureException) {
        KafkaSendFailureException kafkaSendFailureException = (KafkaSendFailureException) em.getPayload();
        if (kafkaSendFailureException.getRecord() != null && kafkaSendFailureException.getRecord().value() != null
                && kafkaSendFailureException.getRecord().value() instanceof byte[]) {
            log.warn("error channel message. Payload {}", new String((byte[])(kafkaSendFailureException.getRecord().value())));
        }
    }
}

Upvotes: 1

Related Questions