Reputation: 11
I am using micrometer tracing in my springboot application. My applicationhas an Api that send the data to a kafka topic and later consumes it. I want my application to have the same traceId throughout for one request. But my traceId is changing after consuming the data from kafka topic.
I have enabled the observation on kafka template and listener.
"spring.kafka.listener.observation-enabled": "true", "spring.kafka.template.observation-enabled": "true",
Upvotes: 1
Views: 660
Reputation: 37
My goal was to pass one trace-id along the chain of data consumers for each request. The chain is something like this:
client->service 1 (REST API )->Kafka topic->service2->service3->etc
I did this using MDC and headers in Kafka messages.
Necessary dependencies for monitoring:
actuator - org.springframework.boot:spring-boot-starter-actuator
zipkin - io.zipkin.reporter2:zipkin-reporter-brave
micrometer-core - io.micrometer:micrometer-tracing
micrometer-adapter - io.micrometer:micrometer-tracing-bridge-brave
Micrometer automatically generates trace-id and span-id for every REST API requests. If you need trace-id and span-id for another methods (scheduled or etc) you can generate it and put in MDC like this:
MDC.put("traceId", UUID.randomUUID().toString())
MDC.put("spanId", UUID.randomUUID().toString())
...
MDC.clear()
To get automatically generated trace-id by micrometer I wrote kotlin function:
fun getTraceIdFromMdc(): String {
runCatching { return MDC.get("traceId") }
.onFailure { MDC.put("traceId", UUID.randomUUID().toString()) }
return MDC.get("traceId")
}
So I get automatically generated trace-id or generate it randomly if it is not exist. I put it in kafka header and send to topic:
fun sendEvent(topic: String, key: String, data: Any): CompletableFuture<SendResult<String, String>> {
val headers: MutableList<Header> = mutableListOf(RecordHeader("traceId", getTraceIdFromMdc().toByteArray()))
val jsonData = objectMapper.writeValueAsString(data)
return kafkaTemplate.send(ProducerRecord(topic, null, key, jsonData, headers))
In consumer get this header and put in MDC:
fun receiveEvent(cr: ConsumerRecord<String, String>, ack: Acknowledgment) {
val traceId = cr.headers().lastHeader("traceId")?.value()?.toString(StandardCharsets.UTF_8) ?: UUID.randomUUID().toString()
MDC.put("traceId", traceId)
// your logic here
ack.acknowledge()
MDC.clear()
}
And don't forget to configure your logback-spring.xml like this:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml"/>
<springProfile name="!prod">
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-},%X{spanId:-}] %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
</springProfile>
</configuration>
Upvotes: 0