Reputation: 1271
I've started using WebClient and I'm adding logging of request/response using filter method:
WebClient.builder()
.baseUrl(properties.getEndpoint())
.filter((request, next) -> {
// logging
request.body()
})
.build();
I'm able to access url, http method, headers but I've a problem with getting raw request body as body()
method of request returns BodyInserter
(BodyInserter<?, ? super ClientHttpRequest> body()
.
How to convert BodyInserter
to String
representation of request body? Alternatively, how to properly log whole request/response while also being able to hash potential credentials in it?
Upvotes: 22
Views: 29091
Reputation: 1
In just a few lines this will log everything:
import reactor.netty.tcp.TcpClient;
import reactor.netty.resources.ConnectionProvider;
TcpClient tcpClient = TcpClient.create(ConnectionProvider.newConnection());
WebClient webClient = WebClient.builder().clientConnector(new
ReactorClientHttpConnector(HttpClient.from(tcpClient).wiretap(true)))
.filter(...
Upvotes: 0
Reputation: 1311
Consider using an exchange filter function that wraps the request object and intercepts the body being written -- buffering it, and working with it in a callback after the body is assembled. Apparently this is not something the spring team want to support in the library itself as it as async is not the paradigm to be buffering things, but here is an example of such an exchange function that would accomplish this:
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestDecorator;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import reactor.core.publisher.Mono;
import java.util.function.BiConsumer;
class BodyBufferingExchangeFilterFunction implements ExchangeFilterFunction {
private BiConsumer<ClientHttpRequest, DataBuffer> callback;
{
// e.g.
callback = (ClientHttpRequest req, DataBuffer body) -> {
// set the content length
req.getHeaders().setContentLength(body.readableByteCount());
};
}
public BodyBufferingExchangeFilterFunction(BiConsumer<ClientHttpRequest, DataBuffer> callback) {
this.callback = callback;
}
@Override
@NonNull
public Mono<ClientResponse> filter(@NonNull ClientRequest request,
@NonNull ExchangeFunction next) {
if (request.method() == HttpMethod.PUT ||
request.method() == HttpMethod.POST) {
ClientRequest buffered = ClientRequest.from(request)
.body((ClientHttpRequest outputMessage, BodyInserter.Context context) ->
request.body().insert(new BufferingDecorator(outputMessage, callback), context))
.build();
return next.exchange(buffered);
} else {
return next.exchange(request);
}
}
private static final class BufferingDecorator extends ClientHttpRequestDecorator {
private final BiConsumer<ClientHttpRequest, DataBuffer> callback;
public BufferingDecorator(ClientHttpRequest outputMessage,
BiConsumer<ClientHttpRequest, DataBuffer> callback) {
super(outputMessage);
this.callback = callback;
}
@Override
@NonNull
public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
return DataBufferUtils.join(body).flatMap(buffer -> {
if (callback != null) callback.accept(this, buffer);
return super.writeWith(Mono.just(buffer));
});
}
}
}
Here's how you'd use it with a webclient too:
// new webclient
WebClient webClient = WebClient.builder()
.filter(new BodyBufferingExchangeFilterFunction())
.build();
// existing webclient
WebClient webClient = getWebClientSomeHow();
webClient = webClient.mutate().filter(new BodyBufferingExchangeFilterFunction()).build();
ResponseEntity<String> somePost = webClient.post()
.body(Mono.just("someData"), String.class)
.retrieve().toEntity(String.class)
.block();
Upvotes: 1
Reputation: 401
Tried all answers, but some of them don't fit my needs or just do not work. Wrote my own solution based on this answers to intercept request/response bodies and log them.
@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {
@Override public void customize(WebClient.Builder webClientBuilder) {
webClientBuilder.filter((request, next) -> {
logRequest(request);
return next
.exchange(interceptBody(request))
.doOnNext(this::logResponse)
.map(this::interceptBody);
});
}
private ClientRequest interceptBody(ClientRequest request) {
return ClientRequest.from(request)
.body((outputMessage, context) -> request.body().insert(new ClientHttpRequestDecorator(outputMessage) {
@Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return super.writeWith(Mono.from(body)
.doOnNext(dataBuffer -> logRequestBody(dataBuffer)));
}
}, context))
.build();
}
private ClientResponse interceptBody(ClientResponse response) {
return response.mutate()
.body(data -> data.doOnNext(this::logResponseBody))
.build();
}
private void logRequest(ClientRequest request) {
log.debug("DOWNSTREAM REQUEST: METHOD {}, URI: {}, HEADERS: {}", request.method(), request.url(), request.headers());
}
private void logRequestBody(DataBuffer dataBuffer) {
log.debug("DOWNSTREAM REQUEST: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
}
private void logResponse(ClientResponse response) {
log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}", response.rawStatusCode(), response.headers().asHttpHeaders());
}
private void logResponseBody(DataBuffer dataBuffer) {
log.debug("DOWNSTREAM RESPONSE: BODY: {}", dataBuffer.toString(StandardCharsets.UTF_8));
}
}
Update: added snippet to log using reactor.netty.http.client.HttpClient
(preferrable solution)
@Slf4j
@Component
public class LoggingCustomizer implements WebClientCustomizer {
@Override public void customize(WebClient.Builder webClientBuilder) {
HttpClient httpClient = HttpClient.create()
.doOnRequest((httpClientRequest, connection) -> connection.addHandlerFirst(new LoggingHandler()));
webClientBuilder.clientConnector(new ReactorClientHttpConnector(httpClient));
}
private static class LoggingHandler extends ChannelDuplexHandler {
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof FullHttpRequest request) {
log.debug("DOWNSTREAM REQUEST: METHOD: {}, URI: {}, BODY: {}, HEADERS: {}",
request.method(), request.uri(), request.content().toString(defaultCharset()), request.headers());
} else if (msg instanceof HttpRequest request) {
log.debug("DOWNSTREAM REQUEST: METHOD: {}, URI: {}, HEADERS: {}",
request.method(), request.uri(), request.headers());
} else if (msg instanceof FullHttpMessage message) {
log.debug("DOWNSTREAM REQUEST: BODY: {}",
message.content().toString(defaultCharset()));
}
super.write(ctx, msg, promise);
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpResponse response) {
log.debug("DOWNSTREAM RESPONSE: STATUS: {}, BODY: {}, HEADERS: {}",
response.status().code(), response.content().toString(defaultCharset()), response.headers());
} else if (msg instanceof HttpResponse response) {
log.debug("DOWNSTREAM RESPONSE: STATUS: {}, HEADERS: {}",
response.status().code(), response.headers());
} else if (!(msg instanceof LastHttpContent) && msg instanceof HttpContent httpContent) {
log.debug("DOWNSTREAM RESPONSE: BODY: {}",
httpContent.content().toString(defaultCharset()));
}
super.channelRead(ctx, msg);
}
}
}
Upvotes: 12
Reputation: 1271
Coming back to the topic with an answer I am so far satisfied.
In following example I created HttpClient
with LoggingHandler that does the logging magic, injected it into ReactorClientHttpConnector
which was default HttpConnector
(see DefaultWebClientBuilder#initConnector
) and then to WebClient
.
val baseHttpClient = HttpClient.create()
.doOnRequest(
(request, conn) -> conn.addHandlerFirst(new LoggingHandler(LogLevel.INFO)
));
val httpClient = WebClient.builder()
.baseUrl("https://google.pl")
.clientConnector(new ReactorClientHttpConnector(baseHttpClient))
.build();
val response = httpClient.post()
.body(Mono.just("Your request body"), String.class)
.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class))
.block();
I'm still planning on creating custom LoggingHandler
which will sanitise and simplify logs.
Upvotes: 4
Reputation: 491
Request body could be accessed when BodyInserter
writes to ReactiveHttpOutputMessage
. So just create a FilterFunction and create new request from existing but for body set new BodyInserser ()
overriding method insert see example below. Response and Request payloads could be read multiple times since those are buffered in DataBuffers
public class TracingExchangeFilterFunction implements ExchangeFilterFunction {
return next.exchange(buildTraceableRequest(request))
.flatMap(response ->
response.body(BodyExtractors.toDataBuffers())
.next()
.doOnNext(dataBuffer -> traceResponse(response, dataBuffer))
.thenReturn(response)) ;
}
private ClientRequest buildTraceableRequest(
final ClientRequest clientRequest) {
return ClientRequest.from(clientRequest).body(
new BodyInserter<>() {
@Override
public Mono<Void> insert(
final ClientHttpRequest outputMessage,
final Context context) {
return clientRequest.body().insert(
new ClientHttpRequestDecorator(outputMessage) {
@Override
public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
return super.writeWith(
from(body).doOnNext(buffer ->
traceRequest(clientRequest, buffer)));
}
}, context);
}
}).build();
}
private void traceRequest(ClientRequest clientRequest, DataBuffer buffer) {
final ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(buffer);
final byte[] bytes = ByteBufUtil.getBytes(byteBuf);
// do some tracing e.g. new String(bytes)
}
private void traceResponse(ClientResponse response, DataBuffer dataBuffer) {
final byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
// do some tracing e.g. new String(bytes)
}
}
Upvotes: 2
Reputation: 5891
You can create your own wrapper/proxy class around the JSON encoder and intercept the serialized body before it is sent into the intertubes.
This blog post shows how to log the JSON payloads of WebClient requests and responses
Specifically, you would extend the encodeValue
method (or encodeValues
in case of streaming data) of Jackson2JsonEncoder
. Then you can do with that data what you wish, such as logging etc. And you could even do this conditionally based on environment/profile
This custom logging-encoder can be specified when creating the WebClient
, by codecs:
CustomBodyLoggingEncoder bodyLoggingEncoder = new CustomBodyLoggingEncoder();
WebClient.builder()
.codecs(clientDefaultCodecsConfigurer -> {
clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonEncoder(bodyLoggingEncoder);
clientDefaultCodecsConfigurer.defaultCodecs().jackson2JsonDecoder(new Jackson2JsonDecoder(new ObjectMapper(), MediaType.APPLICATION_JSON));
})
...
Update 2020/7/3:
Here is a rushed example applying the same principle but for a decoder:
public class LoggingJsonDecoder extends Jackson2JsonDecoder {
private final Consumer<byte[]> payloadConsumer;
public LoggingJsonEncoder(final Consumer<byte[]> payloadConsumer) {
this.payloadConsumer = payloadConsumer;
}
@Override
public Mono<Object> decodeToMono(final Publisher<DataBuffer> input, final ResolvableType elementType, final MimeType mimeType, final Map<String, Object> hints) {
// Buffer for bytes from each published DataBuffer
final ByteArrayOutputStream payload = new ByteArrayOutputStream();
// Augment the Flux, and intercept each group of bytes buffered
final Flux<DataBuffer> interceptor = Flux.from(input)
.doOnNext(buffer -> bufferBytes(payload, buffer))
.doOnComplete(() -> payloadConsumer.accept(payload.toByteArray()));
// Return the original method, giving our augmented Publisher
return super.decodeToMono(interceptor, elementType, mimeType, hints);
}
private void bufferBytes(final ByteArrayOutputStream bao, final DataBuffer buffer) {
try {
bao.write(ByteUtils.extractBytesAndReset(buffer));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
You would configure that along with the encoder using the codecs
builder method on WebClient
.
Of course this above only works assuming your data is being deserialized to a Mono. But override other methods if you need it. Also I'm just stdout'ing the resulting JSON there, but you could pass in a Consumer<String>
or something for the decoder to send the string to for example, or just log from there ; up to you.
A word of warning that in it's current form this is going to be doubling your memory usage, as it's buffering the entire response. If you can send that byte data off to another process/thread to write to log file or some output stream (or Flux even) immediately, you could avoid buffering the whole payload in memory.
Upvotes: 2
Reputation: 464
Try setting the following properties:
logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=TRACE
logging.level.reactor.netty.http.client.HttpClient: DEBUG
spring.http.log-request-details=true
Upvotes: 0