Reputation: 7815
In a test method an instance of org.springframework.web.reactive.function.client.ClientRequest
is received.
I want to verify its HttpMethod, URI and body.
It is quite obvious how to get everything except for the body.
ClientRequest request = makeInstance(...);
assertEquals(HttpMethod.POST, request.method());
assertEquals("somewhere/else", request.url().toString());
// ? unclear how to extract body using the BodyInserter
BodyInserter<?, ? super ClientHttpRequest> inserter = request.body();
inserter.insert(%outputMessage%, %context%);
I have found in sources of Spring how BodyInserters are tested. It is more or less clear how to create a BodyInserter.Context
(the second parameter), but I cannot understand how to construct the first parameter, so the request body could be extracted through it.
Please, show a conventional (or at least usable) way to get request body from a ClientRequest
instance.
Upvotes: 15
Views: 18342
Reputation: 1079
Get request body from ClientRequest. In example below:
Code:
@Bean
public WebClient myWebClient(final MyFilter myFilter, final WebClient.Builder builder, final ReactorClientHttpConnector connector) throws SSLException {
return builder.clientConnector(connector)
.baseUrl("someUrl")
.filter(myFilter)
.build();
}
public class MyFilter implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(final ClientRequest request, final ExchangeFunction next) {
var requestBodyCollector = new RequestBodyCollector(request);
return request.writeTo(requestBodyCollector, ExchangeStrategies.withDefaults())
.then(Mono.defer(() -> next.exchange(request)
.then(Mono.defer(() -> {
// do something with request body
var body = requestBodyCollector.getBodyBytes();
return Mono.empty();
}))));
}
}
@RequiredArgsConstructor
class RequestBodyCollector implements ClientHttpRequest {
private final ClientRequest clientRequest;
private List<byte[]> bodyByteList = new ArrayList<>();
public byte[] getBodyBytes() {
return getBytes(bodyByteList);
}
@Override
public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
return Flux.from(body)
.map(dataBuffer -> collectBytes(dataBuffer, bodyByteList))
.then();
}
@Override
public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body)
.flatMap(Function.identity()))
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
static DataBuffer collectBytes(final DataBuffer dataBuffer, final List<byte[]> bodyByteList) {
final byte[] bytes = new byte[dataBuffer.readableByteCount()];
final DataBuffer retained = dataBuffer.retainedSlice(0, bytes.length);
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
bodyByteList.add(bytes);
return retained;
}
static byte[] getBytes(final List<byte[]> list) {
var buffer = ByteBuffer.wrap(new byte[list.stream()
.mapToInt(bytes -> bytes.length)
.sum()]);
list.forEach(buffer::put);
return buffer.array();
}
@Override
public HttpMethod getMethod() {
return clientRequest.method();
}
@Override
public URI getURI() {
return clientRequest.url();
}
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
return new LinkedMultiValueMap<>();
}
@Override
public <T> T getNativeRequest() {
return null;
}
@Override
public DataBufferFactory bufferFactory() {
return DefaultDataBufferFactory.sharedInstance;
}
@Override
public void beforeCommit(final Supplier<? extends Mono<Void>> action) {
}
@Override
public boolean isCommitted() {
return false;
}
@Override
public Mono<Void> setComplete() {
return Mono.empty();
}
@Override
public HttpHeaders getHeaders() {
return new HttpHeaders();
}
}
Upvotes: 0
Reputation: 21
The simple wrapper can be implemented to hold and get the body object:
public class BodyInserterWrapper<T> implements BodyInserter<T, ReactiveHttpOutputMessage> {
private final BodyInserter<T, ReactiveHttpOutputMessage> delegate;
@Getter
private final T body;
public BodyInserterWrapper(T body) {
this.body = body;
this.delegate = BodyInserters.fromObject(body);
}
@Override
public Mono<Void> insert(ReactiveHttpOutputMessage outputMessage, Context context) {
return this.delegate.insert(outputMessage, context);
}
}
Upvotes: 2
Reputation: 41
I found a relatively simple way to do this, that is to imitate BodyInserters.fromValue()
to implement your own BodyInserter
.
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.UnsupportedMediaTypeException;
import reactor.core.publisher.Mono;
public static class CustomerInserter<T> implements BodyInserter<T, ReactiveHttpOutputMessage> {
private T body;
private CustomerInserter(T body) {
this.body = body;
}
public static <T> CustomerInserter<T> fromValue(T body) {
return new CustomerInserter<T>(body);
}
public T getBody() {
return this.body;
}
@Override
public Mono<Void> insert(ReactiveHttpOutputMessage outputMessage, Context context) {
Mono<T> publisher = Mono.just(this.body);
MediaType mediaType = outputMessage.getHeaders().getContentType();
ResolvableType bodyType = ResolvableType.forInstance(this.body);
return context.messageWriters().stream()
.filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType))
.findFirst()
.map(item -> (HttpMessageWriter<T>) item)
.map(writer -> this.write(publisher, bodyType, mediaType, outputMessage, context, writer))
.orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType)));
}
private Mono<Void> write(Publisher<? extends T> input, ResolvableType type,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
BodyInserter.Context context, HttpMessageWriter<T> writer) {
return context.serverRequest()
.map(request -> {
ServerHttpResponse response = (ServerHttpResponse) message;
return writer.write(input, type, type, mediaType, request, response, context.hints());
})
.orElseGet(() -> writer.write(input, type, mediaType, message, context.hints()));
}
private UnsupportedMediaTypeException unsupportedError(ResolvableType bodyType,
BodyInserter.Context context, @Nullable MediaType mediaType) {
List<MediaType> supportedMediaTypes = context.messageWriters().stream()
.flatMap(reader -> reader.getWritableMediaTypes(bodyType).stream())
.collect(Collectors.toList());
return new UnsupportedMediaTypeException(mediaType, supportedMediaTypes, bodyType);
}
}
simple unit testing.
Response response = webClient.post()
.uri("/xxx")
.body(CustomerInserter.fromValue(body)) //
.retrieve()
.bodyToMono(Response.class)
.block();
WebClient webClient = WebClient.builder()
.baseUrl("http://127.0.0.1:8080")
.filter((request, next) -> {
CustomerInserter<?> inserter = (CustomerInserter<?>) request.body();
// Some things can be done here
Object body = inserter.getBody();
return next.exchange(request);
}).build();
Upvotes: 4
Reputation: 7815
A bit complex for such a simple case, but it took me to implement 5 classes, to extract a body from ClientRequest
.
It seems to be too much, and I'm still curious if there is a short solution for the problem. You're welcome to propose another answer to this question, so I could accept it.
Unfortunately, it must be said that the design of ClientRequest
, BodyInserters
and most of the other stuff from org.springframework.web.reactive.***
has a large room for improvement. At the moment it is just a pile of interfaces with tons of methods per each, and it usually takes a lot of efforts to test code, depending on those classes.
The main goal it to get this method working:
static <T> T extractBody(ClientRequest request, Class<T> clazz) {
InsertionReceiver<T> receiver = InsertionReceiver.forClass(clazz);
return receiver.receiveValue(request.body());
}
Here is the implementation of InsertionReceiver
:
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.reactive.function.BodyInserter;
public interface InsertionReceiver<T> {
T receiveValue(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter);
static <T> InsertionReceiver<T> forClass(Class<T> clazz) {
return new SimpleValueReceiver<>(clazz);
}
}
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.reactive.function.BodyInserter;
class SimpleValueReceiver<T> implements InsertionReceiver<T> {
private static final Object DUMMY = new Object();
private final Class<T> clazz;
private final AtomicReference<Object> reference;
SimpleValueReceiver(Class<T> clazz) {
this.clazz = clazz;
this.reference = new AtomicReference<>(DUMMY);
}
@Override
public T receiveValue(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter) {
demandValueFrom(bodyInserter);
return receivedValue();
}
private void demandValueFrom(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter) {
var inserter = (BodyInserter<?, ReactiveHttpOutputMessage>) bodyInserter;
inserter.insert(
MinimalHttpOutputMessage.INSTANCE,
new SingleWriterContext(new WriteToConsumer<>(reference::set))
);
}
private T receivedValue() {
Object value = reference.get();
reference.set(DUMMY);
T validatedValue;
if (value == DUMMY) {
throw new RuntimeException("Value was not received, Check your inserter worked properly");
} else if (!clazz.isAssignableFrom(value.getClass())) {
throw new RuntimeException(
"Value has unexpected type ("
+ value.getClass().getTypeName()
+ ") instead of (" + clazz.getTypeName() + ")");
} else {
validatedValue = clazz.cast(value);
}
return validatedValue;
}
}
class WriteToConsumer<T> implements HttpMessageWriter<T> {
private final Consumer<T> consumer;
private final List<MediaType> mediaTypes;
WriteToConsumer(Consumer<T> consumer) {
this.consumer = consumer;
this.mediaTypes = Collections.singletonList(MediaType.ALL);
}
@Override
public List<MediaType> getWritableMediaTypes() {
return mediaTypes;
}
@Override
public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
return true;
}
@Override
public Mono<Void> write(
Publisher<? extends T> inputStream,
ResolvableType elementType,
MediaType mediaType,
ReactiveHttpOutputMessage message,
Map<String, Object> hints
) {
inputStream.subscribe(new OneValueConsumption<>(consumer));
return Mono.empty();
}
}
class MinimalHttpOutputMessage implements ReactiveHttpOutputMessage {
public static MinimalHttpOutputMessage INSTANCE = new MinimalHttpOutputMessage();
private MinimalHttpOutputMessage() {
}
@Override
public HttpHeaders getHeaders() {
return HttpHeaders.EMPTY;
}
// other overridden methods are omitted as they do nothing,
// i.e. return null, false, or have empty bodies
}
class OneValueConsumption<T> implements Subscriber<T> {
private final Consumer<T> consumer;
private int remainedAccepts;
public OneValueConsumption(Consumer<T> consumer) {
this.consumer = Objects.requireNonNull(consumer);
this.remainedAccepts = 1;
}
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(T o) {
if (remainedAccepts > 0) {
consumer.accept(o);
remainedAccepts -= 1;
} else {
throw new RuntimeException("No more values can be consumed");
}
}
@Override
public void onError(Throwable t) {
throw new RuntimeException("Single value was not consumed", t);
}
@Override
public void onComplete() {
// nothing
}
}
class SingleWriterContext implements BodyInserter.Context {
private final List<HttpMessageWriter<?>> singleWriterList;
SingleWriterContext(HttpMessageWriter<?> writer) {
this.singleWriterList = List.of(writer);
}
@Override
public List<HttpMessageWriter<?>> messageWriters() {
return singleWriterList;
}
@Override
public Optional<ServerHttpRequest> serverRequest() {
return Optional.empty();
}
@Override
public Map<String, Object> hints() {
return null;
}
}
Upvotes: 13