Reputation: 2268
I have a legacy library that I have to use to retrieve a file. This legacy library doesn't return in InputStream, as you usually expect for reading stuff, but it expects that it is passed an open OutputStream, that it can write to.
I have to write a Webflux REST service, that writes this OutputStream to the org.springframework.web.reactive.function.server.ServerResponse body.
legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse
Since I want to pass along the Stream directly to the ServerResponse, I probably have to do something like this, right?
ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);
Upvotes: 5
Views: 8521
Reputation: 2268
Here is the part of the RequestHandler that's important. I left out some errorhandling/catching of exceptions, that might generally not be needed. Note that I publishedOn
a different Scheduler
for the read (or at least, that's what I wanted to do), so that this blocking read doesn't interfere with my main event thread:
private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
final long blobSize = tag.getBlobSize();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
// for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
for(int i = 0; i < blobSize; i+= tagChunkSize) {
// new DataBuffer that is written to, then emitted later
DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
try (OutputStream outputStream = dataBuffer.asOutputStream()) {
// write to the outputstream of DataBuffer
tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
// don't know if flushing is strictly neccessary
outputStream.flush();
} catch (IOException | FPLibraryException e) {
log.error("Error reading + writing from tag to http outputstream", e);
emitter.error(e);
}
emitter.next(dataBuffer);
}
// if blob is finished, send "complete" to my flux of DataBuffers
emitter.complete();
}, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);
}
Upvotes: 4