phanteh
phanteh

Reputation: 517

Maintain correlationId through RabbitMQ

I've been looking at using RabbitMQ for cross-service messaging. I've been able to configure our Exchanges / Queues / DLX etc. using Spring annotations. Example (simple) queue listener:

@RabbitListener(queues = RabbitMessageType.QueueNames.SMS_NOTIFICATIONS)
public void receive1(Message message) throws Exception {
    RabbitMessageDto messageDto = OBJECT_MAPPER.readValue(message.getBody(), RabbitMessageDto.class);
    SmsNotificationDto payload = OBJECT_MAPPER.readValue(messageDto.getPayload(), SmsNotificationDto.class);
    log.info(payload.getMessage());
}

I'm using spring-cloud-sleuth to generate correlationIds / traceIds, which are preserved when using HTTP requests to talk to other services, enabling us to trace the given ID throughout the logs of our various microservices.

While I can get the current traceId and insert that into my DTO:

@Autowired
private Tracer tracer;

private RabbitMessageDto createRabbitMessageWithPayload(String messageType, 
                                                        String messageVersion, 
                                                        Object payload) {
    return new RabbitMessageDto.Builder()
        .withTraceId(tracer.getCurrentSpan().getTraceId())
        .withDtoName(messageType)
        .withDtoVersion(messageVersion)
        .withPayload(payload)
        .build();
}

I cannot find a way to set the traceId in the receiving method.

Googling keeps bringing me to spring-cloud-stream and spring-cloud-stream-starter-rabbit; documentation seems to indicate that it's possible automatically insert / set traceIds, but I'm not familiar with spring-cloud-stream at all, and don't find the documentation particularly helpful.

So, I would love answers to the following:

Upvotes: 1

Views: 2081

Answers (1)

phanteh
phanteh

Reputation: 517

So, incase someone comes across this looking to set the sleuth traceId context, we came up with the following solution:

@Autowired Tracer tracer;

private void someMethod(long traceId) {
    Span span = Span.builder()
        .traceId(traceId)
        .spanId(new Random().nextLong())
        .build();
    tracer.continueSpan(span);
    // do work
    tracer.closeSpan(span);
}

It should be noted that all the documentation says that a span should be closed once you've finished with it. The do work section above should be wrapped with a try / catch / finally block to ensure this is closed.

Any methods called with the span still open will inherit the traceId.

EDIT

I should also say, it seems the better solution would be to replace the Spring AMQP library with spring-cloud-stream; from what I can tell, this should automatically include the traceId in rabbit messages (correlationId) and set it at the other end. I haven't had the opportunity to test this, however.

Upvotes: 1

Related Questions