Mayank Bhargava
Mayank Bhargava

Reputation: 11

Unable to read exception header from DLQ spring cloud stream kafka

Using Spring cloud Stream Kafka listener to read message from kafka topic and incase of exception sending it to dead letter queue configuring the properties

spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=book_error

I am able to send message to the DLQ . However when I try to read from DLQ so that I can take action based on Error Message . I am not able to read the exception embedded in the header.

@StreamListener("dlqChannel")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: READING FROM DLQ");
    logger.info("header :" +message.getHeaders());
    logger.info("payload : " +message.getPayload());
    //return message;
}

The header payload seem to have object id which I cannot decipher. How do I parse the error and handle based on exception message. Below is the header i get if I try to print it

header :{x-original-offset=[B@f82eb25, x-original-partition=[B@7a3b83c, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=book_errors, kafka_offset=0, x-exception-message=[B@6dcc9872, x-exception-fqcn=[B@68079694, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@70449372, x-original-topic=[B@6a3ca71e, x-original-timestamp-type=[B@63baad23, kafka_receivedPartitionId=0, contentType=application/json, x-original-timestamp=[B@37dd34f6, kafka_receivedTimestamp=1579990310188, kafka_groupId=bkerrgrp, x-exception-stacktrace=[B@6356ee7c}

Upvotes: 1

Views: 1669

Answers (1)

Gary Russell
Gary Russell

Reputation: 174504

The exception headers are byte[] - that's the only type Kafka supports. The various String-valued information is stored as String.getBytes(StandardCharsets.UTF_8)).

Use

String exceptionMessage = new String(message.getHeaders().get("x-exception-message", byte[].class), StandardCharsets.UTF_8);

Numeric values are stored using the appropriate types (int, long).

Use ByteBuffer.wrap(header).getInt() etc for those.

Here is the code that stores the headers...

kafkaHeaders.add(
        new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_PARTITION,
        ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_OFFSET,
        ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP,
        ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE,
        record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN,
        throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
        throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
        getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));

Upvotes: 1

Related Questions