Alka Kumari
Alka Kumari

Reputation: 11

How to pass the same traceId after consuming from Kafka using micrometer in springboot application

I am using micrometer tracing in my springboot application. My applicationhas an Api that send the data to a kafka topic and later consumes it. I want my application to have the same traceId throughout for one request. But my traceId is changing after consuming the data from kafka topic.

I have enabled the observation on kafka template and listener.

"spring.kafka.listener.observation-enabled": "true", "spring.kafka.template.observation-enabled": "true",

Upvotes: 1

Views: 660

Answers (1)

JavaSash
JavaSash

Reputation: 37

My goal was to pass one trace-id along the chain of data consumers for each request. The chain is something like this:

client->service 1 (REST API )->Kafka topic->service2->service3->etc

I did this using MDC and headers in Kafka messages.

Necessary dependencies for monitoring:

actuator - org.springframework.boot:spring-boot-starter-actuator
zipkin - io.zipkin.reporter2:zipkin-reporter-brave
micrometer-core - io.micrometer:micrometer-tracing
micrometer-adapter - io.micrometer:micrometer-tracing-bridge-brave

Micrometer automatically generates trace-id and span-id for every REST API requests. If you need trace-id and span-id for another methods (scheduled or etc) you can generate it and put in MDC like this:

MDC.put("traceId", UUID.randomUUID().toString())
MDC.put("spanId", UUID.randomUUID().toString())
...
MDC.clear()

To get automatically generated trace-id by micrometer I wrote kotlin function:

fun getTraceIdFromMdc(): String {
    runCatching { return MDC.get("traceId") }
        .onFailure { MDC.put("traceId", UUID.randomUUID().toString()) }
    return MDC.get("traceId")
}

So I get automatically generated trace-id or generate it randomly if it is not exist. I put it in kafka header and send to topic:

fun sendEvent(topic: String, key: String, data: Any): CompletableFuture<SendResult<String, String>> {
        val headers: MutableList<Header> = mutableListOf(RecordHeader("traceId", getTraceIdFromMdc().toByteArray()))
        val jsonData = objectMapper.writeValueAsString(data)
        return kafkaTemplate.send(ProducerRecord(topic, null, key, jsonData, headers))

In consumer get this header and put in MDC:

fun receiveEvent(cr: ConsumerRecord<String, String>, ack: Acknowledgment) {
        val traceId = cr.headers().lastHeader("traceId")?.value()?.toString(StandardCharsets.UTF_8) ?: UUID.randomUUID().toString()
        MDC.put("traceId", traceId)
        // your logic here
        ack.acknowledge()
        MDC.clear()
}

And don't forget to configure your logback-spring.xml like this:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/base.xml"/>
    <springProfile name="!prod">
        <root level="INFO">
            <appender-ref ref="CONSOLE"/>
        </root>
        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>
                    %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-},%X{spanId:-}] %logger{36} - %msg%n
                </pattern>
            </encoder>
        </appender>
    </springProfile>
</configuration>

Upvotes: 0

Related Questions