Stanislav Giley
Stanislav Giley

Reputation: 43

Passing headers From Kafka ConsumerRecord to Kafka ProducerRecord / RestTemplate / Feign

We are planning to use custom headers for testing purposes which should be passed across our microservices. Our application use Java/Kotlin with Spring Boot.

Thanks to RequestContextHolder we can easily get headers from HTTP-requests (Servlets) and pass them wherever we want. For example, we add them in every Feign-request:

class CustomRequestInterceptor : RequestInterceptor {

    override fun apply(template: RequestTemplate) {

        val requestAttributes = RequestContextHolder.getRequestAttributes() as ServletRequestAttributes?

        requestAttributes?.request?.let { request ->
            request.headerNames.toList()
                .forEach { name -> template.header(name, request.getHeader(name)) }
        }
    }
}

For Kafka Producers there's ProducerInterceptor.

But is there a way to catch ConsumerRecord's Headers and pass them further?

We planned to use ThreadLocal variables (like RequestContextHolder does), but we didn't find a way to set and unset Headers in this variable.

For example, we can pass the Headers in RecordInterceptor, but we can't clear our variable when the thread finishes processing current record, so next time we use this thread we can read the wrong headers (ThreadLocal variable will not be cleared).

Are there any interfaces/instruments which can help us? Thanks in advance!

Upvotes: 0

Views: 1248

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

Version 2.7 added success and failure methods to the interceptor.

The upcoming version 2.8 adds afterRecord (which is called after the ErrorHandler).

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java#L50-L93

Another alternative is to add a MethodInterceptor to the container's advice chain.

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java#L847-L852

Upvotes: 1

Related Questions