Frischling
Frischling

Reputation: 2268

Convert writes to OutputStream into a Flux<DataBuffer> usable by ServerResponse

I have a legacy library that I have to use to retrieve a file. This legacy library doesn't return in InputStream, as you usually expect for reading stuff, but it expects that it is passed an open OutputStream, that it can write to.

I have to write a Webflux REST service, that writes this OutputStream to the org.springframework.web.reactive.function.server.ServerResponse body.

legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse

Since I want to pass along the Stream directly to the ServerResponse, I probably have to do something like this, right?

ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);

Upvotes: 5

Views: 8521

Answers (1)

Frischling
Frischling

Reputation: 2268

Here is the part of the RequestHandler that's important. I left out some errorhandling/catching of exceptions, that might generally not be needed. Note that I publishedOn a different Scheduler for the read (or at least, that's what I wanted to do), so that this blocking read doesn't interfere with my main event thread:

    private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
        final long blobSize = tag.getBlobSize();
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
          // for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
          for(int i = 0; i < blobSize; i+= tagChunkSize) {
            // new DataBuffer that is written to, then emitted later
            DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
            try (OutputStream outputStream = dataBuffer.asOutputStream()) {
              // write to the outputstream of DataBuffer
              tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
              // don't know if flushing is strictly neccessary
              outputStream.flush();
            } catch (IOException | FPLibraryException e) {
              log.error("Error reading + writing from tag to http outputstream", e);
              emitter.error(e);
            }
            emitter.next(dataBuffer);
          }
          // if blob is finished, send "complete" to my flux of DataBuffers
          emitter.complete();
        }, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
  
    }

Upvotes: 4

Related Questions