Rahul Sharma
Rahul Sharma

Reputation: 5834

How to convert Flux of ByteBuffer to Spring BodyInserter

I have an usecase to read a file from s3 and publish to rest service in java.
For the implementation, I am trying awssdk s3 API to read file which returns Flux<ByteBuffer> and then publish to rest service using the Spring WebClient.

Per my exploration, the spring WebClient requires BodyInserter which can be prepared using the BodyInserters.fromDataBuffers. I am unable to figure out how to convert properly Flux to Flux and call WebClient exchange;

Flux<ByteBuffer> byteBufferFlux = getS3File(key);
        Flux<DataBuffer> dataBufferFlux= byteBufferFlux.map(byteBuffer -> {
            ?????????????Convert bytebuffer to data buffer ??????
            return dataBuffer;
        });

        BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromDataBuffers(dataBufferFlux);

Any suggestions how to achieve this?

Upvotes: 3

Views: 4698

Answers (2)

123
123

Reputation: 11216

You can convert using DefaultDataBuffer which you can create via the DefaultDataBufferFactory

DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

Flux<DataBuffer> buffer = getS3File(key).map(dataBufferFactory::wrap);

BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> inserter =
    BodyInserters.fromDataBuffers(buffer);

You don't actually need a BodyInserter at all though if using Webclient you can the following method signature for body()

<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass);

Which you can then pass your Flux<ByteBuffer> directly into, whilst specifying the Class to use

    WebClient.create("http://someUrl")
            .post()
            .uri("/someUri")
            .body(getS3File(key),ByteBuffer.class)

Upvotes: 3

Manish Maheshwari
Manish Maheshwari

Reputation: 4134

You may not need dataBufferFlux and should be able to write the Flux to your rest endpoint. Try this:

  Flux<ByteBuffer> byteBufferFlux = getS3File(key);

  BodyInserter<Flux<ByteBuffer>, ReactiveHttpOutputMessage> = BodyInserters.fromPublisher(byteBufferFlux, ByteBuffer.class);

Upvotes: 1

Related Questions