Marcin Szulc
Marcin Szulc

Reputation: 1271

WebClient - how to get request body?

I've started using WebClient and I'm adding logging of request/response using filter method:

WebClient.builder()
    .baseUrl(properties.getEndpoint())
    .filter((request, next) -> {
        // logging
        request.body()
    })
    .build();

I'm able to access url, http method, headers but I've a problem with getting raw request body as body() method of request returns BodyInserter (BodyInserter<?, ? super ClientHttpRequest> body().

How to convert BodyInserter to String representation of request body? Alternatively, how to properly log whole request/response while also being able to hash potential credentials in it?

Upvotes: 22

Views: 29091

Answers (7)

Godfried
Godfried

Reputation: 1

In just a few lines this will log everything:

 import reactor.netty.tcp.TcpClient;
 import reactor.netty.resources.ConnectionProvider;

 TcpClient tcpClient = TcpClient.create(ConnectionProvider.newConnection());
 WebClient webClient = WebClient.builder().clientConnector(new 
 ReactorClientHttpConnector(HttpClient.from(tcpClient).wiretap(true)))
            .filter(...

Upvotes: 0

Dave Ankin
Dave Ankin

Reputation: 1311

Consider using an exchange filter function that wraps the request object and intercepts the body being written -- buffering it, and working with it in a callback after the body is assembled. Apparently this is not something the spring team want to support in the library itself as it as async is not the paradigm to be buffering things, but here is an example of such an exchange function that would accomplish this:

import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestDecorator;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import reactor.core.publisher.Mono;

import java.util.function.BiConsumer;


class BodyBufferingExchangeFilterFunction implements ExchangeFilterFunction {

    private BiConsumer<ClientHttpRequest, DataBuffer> callback;

    {
        // e.g.
        callback = (ClientHttpRequest req, DataBuffer body) -> {
            // set the content length
            req.getHeaders().setContentLength(body.readableByteCount());
        };
    }

    public BodyBufferingExchangeFilterFunction(BiConsumer<ClientHttpRequest, DataBuffer> callback) {
        this.callback = callback;
    }

    @Override
    @NonNull
    public Mono<ClientResponse> filter(@NonNull ClientRequest request,
                                       @NonNull ExchangeFunction next) {
        if (request.method() == HttpMethod.PUT ||
                request.method() == HttpMethod.POST) {
            ClientRequest buffered = ClientRequest.from(request)
                    .body((ClientHttpRequest outputMessage, BodyInserter.Context context) ->
                            request.body().insert(new BufferingDecorator(outputMessage, callback), context))
                    .build();
            return next.exchange(buffered);
        } else {
            return next.exchange(request);
        }
    }

    private static final class BufferingDecorator extends ClientHttpRequestDecorator {
        private final BiConsumer<ClientHttpRequest, DataBuffer> callback;

        public BufferingDecorator(ClientHttpRequest outputMessage,
                                  BiConsumer<ClientHttpRequest, DataBuffer> callback) {
            super(outputMessage);
            this.callback = callback;
        }

        @Override
        @NonNull
        public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
            return DataBufferUtils.join(body).flatMap(buffer -> {
                if (callback != null) callback.accept(this, buffer);
                return super.writeWith(Mono.just(buffer));
            });
        }

    }

}

Here's how you'd use it with a webclient too:

// new webclient
WebClient webClient = WebClient.builder()
        .filter(new BodyBufferingExchangeFilterFunction())
        .build();

// existing webclient
WebClient webClient = getWebClientSomeHow();
webClient = webClient.mutate().filter(new BodyBufferingExchangeFilterFunction()).build();

ResponseEntity<String> somePost = webClient.post()
        .body(Mono.just("someData"), String.class)
        .retrieve().toEntity(String.class)
        .block();

Upvotes: 1

Pavel Kolmykov
Pavel Kolmykov

Reputation: 401

Tried all answers, but some of them don't fit my needs or just do not work. Wrote my own solution based on this answers to intercept request/response bodies and log them.

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        webClientBuilder.filter((request, next) -> {
            logRequest(request);
            return next
                .exchange(interceptBody(request))
                .doOnNext(this::logResponse)
                .map(this::interceptBody);
        });
    }

    private ClientRequest interceptBody(ClientRequest request) {
        return ClientRequest.from(request)
            .body((outputMessage, context) -> request.body().insert(new ClientHttpRequestDecorator(outputMessage) {
                @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    return super.writeWith(Mono.from(body)
                        .doOnNext(dataBuffer -> logRequestBody(dataBuffer)));
                }
            }, context))
            .build();
    }

    private ClientResponse interceptBody(ClientResponse response) {
        return response.mutate()
            .body(data -> data.doOnNext(this::logResponseBody))
            .build();
    }

    private void logRequest(ClientRequest request) {
        log.debug("DOWNSTREAM REQUEST: METHOD {}, URI: {}, HEADERS: {}", request.method(), request.url(), request.headers());
    }

    private void logRequestBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM REQUEST: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

    private void logResponse(ClientResponse response) {
        log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}", response.rawStatusCode(), response.headers().asHttpHeaders());
    }

    private void logResponseBody(DataBuffer dataBuffer) {
        log.debug("DOWNSTREAM RESPONSE: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
    }

}

Update: added snippet to log using reactor.netty.http.client.HttpClient (preferrable solution)

@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {

    @Override public void customize(WebClient.Builder webClientBuilder) {
        HttpClient httpClient = HttpClient.create()
            .doOnRequest((httpClientRequest, connection) -> connection.addHandlerFirst(new LoggingHandler()));
        webClientBuilder.clientConnector(new ReactorClientHttpConnector(httpClient));
    }

    private static class LoggingHandler extends ChannelDuplexHandler {

        @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (msg instanceof FullHttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, BODY: {}, HEADERS: {}",
                    request.method(), request.uri(), request.content().toString(defaultCharset()), request.headers());
            } else if (msg instanceof HttpRequest request) {
                log.debug("DOWNSTREAM  REQUEST: METHOD: {}, URI: {}, HEADERS: {}",
                    request.method(), request.uri(), request.headers());
            } else if (msg instanceof FullHttpMessage message) {
                log.debug("DOWNSTREAM  REQUEST: BODY: {}",
                    message.content().toString(defaultCharset()));
            }
            super.write(ctx, msg, promise);
        }

        @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, BODY: {}, HEADERS: {}",
                    response.status().code(), response.content().toString(defaultCharset()), response.headers());
            } else if (msg instanceof HttpResponse response) {
                log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}",
                    response.status().code(), response.headers());
            } else if (!(msg instanceof LastHttpContent) && msg instanceof HttpContent httpContent) {
                log.debug("DOWNSTREAM RESPONSE: BODY: {}",
                    httpContent.content().toString(defaultCharset()));
            }
            super.channelRead(ctx, msg);
        }
    }

}

Upvotes: 12

Marcin Szulc
Marcin Szulc

Reputation: 1271

Coming back to the topic with an answer I am so far satisfied.

In following example I created HttpClient with LoggingHandler that does the logging magic, injected it into ReactorClientHttpConnector which was default HttpConnector (see DefaultWebClientBuilder#initConnector) and then to WebClient.

val baseHttpClient = HttpClient.create()
    .doOnRequest(
        (request, conn) -> conn.addHandlerFirst(new LoggingHandler(LogLevel.INFO)
    ));

val httpClient = WebClient.builder()
    .baseUrl("https://google.pl")
    .clientConnector(new ReactorClientHttpConnector(baseHttpClient))
    .build();

val response = httpClient.post()
    .body(Mono.just("Your request body"), String.class)
    .exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class))
    .block();

I'm still planning on creating custom LoggingHandler which will sanitise and simplify logs.

Upvotes: 4

Oleg
Oleg

Reputation: 491

Request body could be accessed when BodyInserter writes to ReactiveHttpOutputMessage. So just create a FilterFunction and create new request from existing but for body set new BodyInserser () overriding method insert see example below. Response and Request payloads could be read multiple times since those are buffered in DataBuffers

public class TracingExchangeFilterFunction implements ExchangeFilterFunction {
 
 
        return next.exchange(buildTraceableRequest(request))
                .flatMap(response ->
                        response.body(BodyExtractors.toDataBuffers())
                                .next()
                                .doOnNext(dataBuffer -> traceResponse(response, dataBuffer))
                                .thenReturn(response)) ;
    }

    private ClientRequest buildTraceableRequest( 
            final ClientRequest clientRequest) {
        return ClientRequest.from(clientRequest).body(
                new BodyInserter<>() {
                    @Override
                    public Mono<Void> insert(
                            final ClientHttpRequest outputMessage,
                            final Context context) {
                        return clientRequest.body().insert(
                                new ClientHttpRequestDecorator(outputMessage) {
                                    @Override
                                    public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
                                        return super.writeWith(
                                                from(body).doOnNext(buffer ->
                                                        traceRequest(clientRequest, buffer)));
                                    }
                                }, context);
                    }
                }).build();
    }

    private void traceRequest(ClientRequest clientRequest, DataBuffer buffer) {
        final ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(buffer);
        final byte[] bytes = ByteBufUtil.getBytes(byteBuf);
        // do some tracing e.g. new String(bytes)
    }


    private void traceResponse(ClientResponse response, DataBuffer dataBuffer) {
        final byte[] bytes = new byte[dataBuffer.readableByteCount()];
        dataBuffer.read(bytes);
        // do some tracing e.g. new String(bytes)
    }
}

Upvotes: 2

rewolf
rewolf

Reputation: 5891

You can create your own wrapper/proxy class around the JSON encoder and intercept the serialized body before it is sent into the intertubes.

This blog post shows how to log the JSON payloads of WebClient requests and responses

Specifically, you would extend the encodeValue method (or encodeValues in case of streaming data) of Jackson2JsonEncoder. Then you can do with that data what you wish, such as logging etc. And you could even do this conditionally based on environment/profile

This custom logging-encoder can be specified when creating the WebClient, by codecs:

 CustomBodyLoggingEncoder bodyLoggingEncoder = new CustomBodyLoggingEncoder();
 WebClient.builder()
          .codecs(clientDefaultCodecsConfigurer -> {
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonEncoder(bodyLoggingEncoder);
             clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(new ObjectMapper(), MediaType.APPLICATION_JSON));
          })
          ...

Update 2020/7/3:

Here is a rushed example applying the same principle but for a decoder:

public class LoggingJsonDecoder extends Jackson2JsonDecoder {
    private final Consumer<byte[]> payloadConsumer;

    public LoggingJsonEncoder(final Consumer<byte[]> payloadConsumer) {
        this.payloadConsumer = payloadConsumer;
    }

    @Override
    public Mono<Object> decodeToMono(final Publisher<DataBuffer> input, final ResolvableType elementType, final MimeType mimeType, final Map<String, Object> hints) {
        // Buffer for bytes from each published DataBuffer
        final ByteArrayOutputStream payload = new ByteArrayOutputStream();

        // Augment the Flux, and intercept each group of bytes buffered
        final Flux<DataBuffer> interceptor = Flux.from(input)
                                                 .doOnNext(buffer -> bufferBytes(payload, buffer))
                                                 .doOnComplete(() -> payloadConsumer.accept(payload.toByteArray()));

        // Return the original method, giving our augmented Publisher
        return super.decodeToMono(interceptor, elementType, mimeType, hints);
    }

    private void bufferBytes(final ByteArrayOutputStream bao, final DataBuffer buffer) {
        try {
            bao.write(ByteUtils.extractBytesAndReset(buffer));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

You would configure that along with the encoder using the codecs builder method on WebClient. Of course this above only works assuming your data is being deserialized to a Mono. But override other methods if you need it. Also I'm just stdout'ing the resulting JSON there, but you could pass in a Consumer<String> or something for the decoder to send the string to for example, or just log from there ; up to you.

A word of warning that in it's current form this is going to be doubling your memory usage, as it's buffering the entire response. If you can send that byte data off to another process/thread to write to log file or some output stream (or Flux even) immediately, you could avoid buffering the whole payload in memory.

Upvotes: 2

Justice
Justice

Reputation: 464

Try setting the following properties:

logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=TRACE
logging.level.reactor.netty.http.client.HttpClient: DEBUG
spring.http.log-request-details=true

Upvotes: 0

Related Questions