AntonK
AntonK

Reputation: 109

Add trace_id in kafka producer callback

Need a possibility to add tracing id from spring-sleuth in kafka producer callback

I add a sleuth starter to my POM and create a kafka-producer. I'm looking for a way to add current trace_id to the logs in callback.

ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
    send.addCallback(
        new ListenableFutureCallback<>() {
          @Override
          public void onFailure(Throwable throwable) {
            log.error(
                MessageFormat.format(
                    "Error when sending message {0} to Kafka", data.getGlobalUUID()),
                throwable);
            deferredResult.setErrorResult(
                new MarkusKafkaInputException(
                    MessageFormat.format(
                        "Error proceed {0} with message: {1}",
                        data.getGlobalUUID(), throwable.getMessage())));
          }

          @Override
          public void onSuccess(SendResult<K, V> result) {
            log.trace(result.toString());
          }
        });

Sleuth dependency in POM -

       <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>

I expect trace_id in logs, which are used in callback. Or there is no ways to get trace_id asynchronously?

Upvotes: 0

Views: 1201

Answers (1)

AntonK
AntonK

Reputation: 109

Find the way to do it, but not sure that it is correct

private static class TracingCallback<K, V> implements ListenableFutureCallback<SendResult<K, V>> {

    private final Span span;
    private final Tracer tracer;

    private TracingCallback(Span span, Tracer tracer) {
      this.span = span;
      this.tracer = tracer;
    }

    @Override
    public void onFailure(Throwable throwable) {
      try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
        log.error("test111");
      } finally {
        span.finish();
      }
    }

    @Override
    public void onSuccess(SendResult<K, V> kvSendResult) {
      try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
        log.error("test111");
      } finally {
        span.finish();
      }
    }
  }

and in code use -

@Autowired private Tracer tracer;

...

ListenableFuture<SendResult<K, V>> send = kafkaTemplate.send(topic, data);
    send.addCallback(new TracingCallback<>(tracer.currentSpan(), tracer));
...

Upvotes: 1

Related Questions