diziaq
diziaq

Reputation: 7815

Hot to get body as String from spring reactive ClientRequest?

In a test method an instance of org.springframework.web.reactive.function.client.ClientRequest is received.

I want to verify its HttpMethod, URI and body.

It is quite obvious how to get everything except for the body.

ClientRequest request = makeInstance(...);

assertEquals(HttpMethod.POST, request.method());
assertEquals("somewhere/else", request.url().toString());

// ? unclear how to extract body using the BodyInserter

BodyInserter<?, ? super ClientHttpRequest> inserter = request.body();

inserter.insert(%outputMessage%, %context%);

I have found in sources of Spring how BodyInserters are tested. It is more or less clear how to create a BodyInserter.Context (the second parameter), but I cannot understand how to construct the first parameter, so the request body could be extracted through it.

Please, show a conventional (or at least usable) way to get request body from a ClientRequest instance.

Upvotes: 15

Views: 18342

Answers (4)

Kirill Mikhailov
Kirill Mikhailov

Reputation: 1079

Get request body from ClientRequest. In example below:

  1. I use MyFilter class in a webClient builder
  2. In myFilter, use ClientRequest to extract the request body (see RequestBodyCollector)
  3. In RequestBodyCollector override writeTo* methods to extract the body.

Code:

@Bean
public WebClient myWebClient(final MyFilter myFilter, final WebClient.Builder builder, final ReactorClientHttpConnector connector) throws SSLException {
    return builder.clientConnector(connector)
            .baseUrl("someUrl")
            .filter(myFilter)
            .build();
}

public class MyFilter implements ExchangeFilterFunction {
    @Override
    public Mono<ClientResponse> filter(final ClientRequest request, final ExchangeFunction next) {
        var requestBodyCollector = new RequestBodyCollector(request);
        return request.writeTo(requestBodyCollector, ExchangeStrategies.withDefaults())
                .then(Mono.defer(() -> next.exchange(request)
                        .then(Mono.defer(() -> {
                            // do something with request body
                            var body = requestBodyCollector.getBodyBytes();
                            return Mono.empty();
                        }))));
    }
}

@RequiredArgsConstructor
class RequestBodyCollector implements ClientHttpRequest {
    private final ClientRequest clientRequest;
    private List<byte[]> bodyByteList = new ArrayList<>();

    public byte[] getBodyBytes() {
        return getBytes(bodyByteList);
    }

    @Override
    public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
        return Flux.from(body)
                .map(dataBuffer -> collectBytes(dataBuffer, bodyByteList))
                .then();
    }

    @Override
    public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
        return writeWith(Flux.from(body)
                .flatMap(Function.identity()))
                .doOnDiscard(DataBuffer.class, DataBufferUtils::release);
    }

    static DataBuffer collectBytes(final DataBuffer dataBuffer, final List<byte[]> bodyByteList) {
        final byte[] bytes = new byte[dataBuffer.readableByteCount()];
        final DataBuffer retained = dataBuffer.retainedSlice(0, bytes.length);
        dataBuffer.read(bytes);
        DataBufferUtils.release(dataBuffer);
        bodyByteList.add(bytes);
        return retained;
    }

    static byte[] getBytes(final List<byte[]> list) {
        var buffer = ByteBuffer.wrap(new byte[list.stream()
                .mapToInt(bytes -> bytes.length)
                .sum()]);
        list.forEach(buffer::put);
        return buffer.array();
    }

    @Override
    public HttpMethod getMethod() {
        return clientRequest.method();
    }

    @Override
    public URI getURI() {
        return clientRequest.url();
    }

    @Override
    public MultiValueMap<String, HttpCookie> getCookies() {
        return new LinkedMultiValueMap<>();
    }

    @Override
    public <T> T getNativeRequest() {
        return null;
    }

    @Override
    public DataBufferFactory bufferFactory() {
        return DefaultDataBufferFactory.sharedInstance;
    }

    @Override
    public void beforeCommit(final Supplier<? extends Mono<Void>> action) {
    }

    @Override
    public boolean isCommitted() {
        return false;
    }

    @Override
    public Mono<Void> setComplete() {
        return Mono.empty();
    }

    @Override
    public HttpHeaders getHeaders() {
        return new HttpHeaders();
    }
}

Upvotes: 0

Yuliia Panchenko
Yuliia Panchenko

Reputation: 21

The simple wrapper can be implemented to hold and get the body object:

public class BodyInserterWrapper<T> implements BodyInserter<T, ReactiveHttpOutputMessage> {

    private final BodyInserter<T, ReactiveHttpOutputMessage> delegate;
    @Getter
    private final T body;

    public BodyInserterWrapper(T body) {
        this.body = body;
        this.delegate = BodyInserters.fromObject(body);
    }

    @Override
    public Mono<Void> insert(ReactiveHttpOutputMessage outputMessage, Context context) {
        return this.delegate.insert(outputMessage, context);
    }
}

Upvotes: 2

0x233
0x233

Reputation: 41

I found a relatively simple way to do this, that is to imitate BodyInserters.fromValue() to implement your own BodyInserter.

import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.UnsupportedMediaTypeException;
import reactor.core.publisher.Mono;

public static class CustomerInserter<T> implements BodyInserter<T, ReactiveHttpOutputMessage> {

    private T body;

    private CustomerInserter(T body) {
        this.body = body;
    }

    public static <T> CustomerInserter<T> fromValue(T body) {
        return new CustomerInserter<T>(body);
    }

    public T getBody() {
        return this.body;
    }

    @Override
    public Mono<Void> insert(ReactiveHttpOutputMessage outputMessage, Context context) {
        Mono<T> publisher = Mono.just(this.body);
        MediaType mediaType = outputMessage.getHeaders().getContentType();
        ResolvableType bodyType = ResolvableType.forInstance(this.body);
        return context.messageWriters().stream()
                .filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType))
                .findFirst()
                .map(item -> (HttpMessageWriter<T>) item)
                .map(writer -> this.write(publisher, bodyType, mediaType, outputMessage, context, writer))
                .orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType)));
    }

    private Mono<Void> write(Publisher<? extends T> input, ResolvableType type,
                                        @Nullable MediaType mediaType, ReactiveHttpOutputMessage message,
                                        BodyInserter.Context context, HttpMessageWriter<T> writer) {

        return context.serverRequest()
                .map(request -> {
                    ServerHttpResponse response = (ServerHttpResponse) message;
                    return writer.write(input, type, type, mediaType, request, response, context.hints());
                })
                .orElseGet(() -> writer.write(input, type, mediaType, message, context.hints()));
    }

    private UnsupportedMediaTypeException unsupportedError(ResolvableType bodyType,
                                                   BodyInserter.Context context, @Nullable MediaType mediaType) {

        List<MediaType> supportedMediaTypes = context.messageWriters().stream()
                .flatMap(reader -> reader.getWritableMediaTypes(bodyType).stream())
                .collect(Collectors.toList());

        return new UnsupportedMediaTypeException(mediaType, supportedMediaTypes, bodyType);
    }
}

simple unit testing.

Response response = webClient.post()
            .uri("/xxx")
            .body(CustomerInserter.fromValue(body)) //
            .retrieve()
            .bodyToMono(Response.class)
            .block();

WebClient webClient = WebClient.builder()
            .baseUrl("http://127.0.0.1:8080")
            .filter((request, next) -> {
                CustomerInserter<?> inserter = (CustomerInserter<?>) request.body();
                // Some things can be done here
                Object body = inserter.getBody();
                return next.exchange(request);
            }).build();

Upvotes: 4

diziaq
diziaq

Reputation: 7815

A bit complex for such a simple case, but it took me to implement 5 classes, to extract a body from ClientRequest.

It seems to be too much, and I'm still curious if there is a short solution for the problem. You're welcome to propose another answer to this question, so I could accept it.

Unfortunately, it must be said that the design of ClientRequest, BodyInserters and most of the other stuff from org.springframework.web.reactive.*** has a large room for improvement. At the moment it is just a pile of interfaces with tons of methods per each, and it usually takes a lot of efforts to test code, depending on those classes.

The main goal it to get this method working:

static <T> T extractBody(ClientRequest request, Class<T> clazz) {

  InsertionReceiver<T> receiver = InsertionReceiver.forClass(clazz);
  return receiver.receiveValue(request.body());
}

Here is the implementation of InsertionReceiver:


import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.reactive.function.BodyInserter;

public interface InsertionReceiver<T> {

  T receiveValue(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter);

  static <T> InsertionReceiver<T> forClass(Class<T> clazz) {
    return new SimpleValueReceiver<>(clazz);
  }
}

import java.util.concurrent.atomic.AtomicReference;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.reactive.function.BodyInserter;
    
class SimpleValueReceiver<T> implements InsertionReceiver<T> {

  private static final Object DUMMY = new Object();

  private final Class<T> clazz;
  private final AtomicReference<Object> reference;

  SimpleValueReceiver(Class<T> clazz) {
    this.clazz = clazz;
    this.reference = new AtomicReference<>(DUMMY);
  }

  @Override
  public T receiveValue(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter) {
    demandValueFrom(bodyInserter);

    return receivedValue();
  }

  private void demandValueFrom(BodyInserter<?, ? extends ReactiveHttpOutputMessage> bodyInserter) {    
    var inserter = (BodyInserter<?, ReactiveHttpOutputMessage>) bodyInserter;

    inserter.insert(
        MinimalHttpOutputMessage.INSTANCE,
        new SingleWriterContext(new WriteToConsumer<>(reference::set))
    );
  }

  private T receivedValue() {
    Object value = reference.get();
    reference.set(DUMMY);

    T validatedValue;

    if (value == DUMMY) {
      throw new RuntimeException("Value was not received, Check your inserter worked properly");
    } else if (!clazz.isAssignableFrom(value.getClass())) {
      throw new RuntimeException(
          "Value has unexpected type ("
              + value.getClass().getTypeName()
              + ") instead of (" + clazz.getTypeName() + ")");
    } else {
      validatedValue = clazz.cast(value);
    }

    return validatedValue;
  }
}

class WriteToConsumer<T> implements HttpMessageWriter<T> {

  private final Consumer<T> consumer;
  private final List<MediaType> mediaTypes;

  WriteToConsumer(Consumer<T> consumer) {
    this.consumer = consumer;
    this.mediaTypes = Collections.singletonList(MediaType.ALL);
  }

  @Override
  public List<MediaType> getWritableMediaTypes() {
    return mediaTypes;
  }

  @Override
  public boolean canWrite(ResolvableType elementType, MediaType mediaType) {
    return true;
  }

  @Override
  public Mono<Void> write(
      Publisher<? extends T> inputStream,
      ResolvableType elementType,
      MediaType mediaType,
      ReactiveHttpOutputMessage message,
      Map<String, Object> hints
  ) {
    inputStream.subscribe(new OneValueConsumption<>(consumer));
    return Mono.empty();
  }
}

class MinimalHttpOutputMessage implements ReactiveHttpOutputMessage {

  public static MinimalHttpOutputMessage INSTANCE = new MinimalHttpOutputMessage();

  private MinimalHttpOutputMessage() {
  }

  @Override
  public HttpHeaders getHeaders() {
    return HttpHeaders.EMPTY;
  }

  // other overridden methods are omitted as they do nothing,
  // i.e. return null, false, or have empty bodies
}

class OneValueConsumption<T> implements Subscriber<T> {

  private final Consumer<T> consumer;
  private int remainedAccepts;

  public OneValueConsumption(Consumer<T> consumer) {
    this.consumer = Objects.requireNonNull(consumer);
    this.remainedAccepts = 1;
  }

  @Override
  public void onSubscribe(Subscription s) {
    s.request(1);
  }

  @Override
  public void onNext(T o) {
    if (remainedAccepts > 0) {
      consumer.accept(o);
      remainedAccepts -= 1;
    } else {
      throw new RuntimeException("No more values can be consumed");
    }
  }

  @Override
  public void onError(Throwable t) {
    throw new RuntimeException("Single value was not consumed", t);
  }

  @Override
  public void onComplete() {
    // nothing
  }
}

class SingleWriterContext implements BodyInserter.Context {

  private final List<HttpMessageWriter<?>> singleWriterList;

  SingleWriterContext(HttpMessageWriter<?> writer) {
    this.singleWriterList = List.of(writer);
  }

  @Override
  public List<HttpMessageWriter<?>> messageWriters() {
    return singleWriterList;
  }

  @Override
  public Optional<ServerHttpRequest> serverRequest() {
    return Optional.empty();
  }

  @Override
  public Map<String, Object> hints() {
    return null;
  }
}

Upvotes: 13

Related Questions