Reputation: 333
I'm working on a greenfield reactive project where a lot of file handling IO is going on. Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler? Will the boundedElastic pool size limit the number of concurrent operations?
If this is not the correct method, can you show an example how to write bytes to a file using Reactor?
Upvotes: 4
Views: 4091
Reputation: 4281
After some researching the java.nio
and Spring library I have found the convenient approach to write strings to file as DataBuffers (which perfectly connect with WebFlux) into AsynchronousFileChannel
using Spring classes.
It's not "truly" reactive way to write lines in file, but asyncronous and it is still better than using some standard blocking API.
public Mono<Void> writeRows(Flux<String> rowsFlux) {
DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
CharSequenceEncoder encoder = CharSequenceEncoder.textPlainOnly();
Flux<DataBuffer> dataBufferFlux = rowsFlux.map(line ->
encoder.encodeValue(line, bufferFactory, ResolvableType.NONE, null, null)
);
return DataBufferUtils.write(
dataBufferFlux,
Path.of("/your_path_to_save_file/file.txt"),
StandardOpenOption.CREATE_NEW
);
}
Of course, for better performance in this case you can buffer your strings in flux and then append those strings to one string and create a data buffer from it.
Or if you already have Flux of data buffers you can write them to file using DataBufferUtils directly.
Upvotes: 2
Reputation: 72379
Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler?
This comes down to opinion on some level - but no, certainly not ideal not for a reactive greenfield project IMHO. boundedElastic()
schedulers are great for interfacing with blocking IO when you must, but they're not a good replacement when a true non-blocking solution exists. (Sometimes this is a bit of a moot point with file handling, since it depends if it's possible for the underlying system to do it asynchronously - but usually that's possible these days.)
In your case, I'd look at wrapping AsynchronousFileChannel
in a reactive publisher. You'll need to use create()
or push()
for this and then make explicit calls to the sink
, but exactly how you do this depends on your use case. As a "simplest case" for file writing, you could feasibly do something like:
static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
return Mono.create(sink -> {
byte[] bytes = content.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
channel.write(buffer, 0, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
sink.success();
}
@Override
public void failed(Throwable exc, Object attachment) {
sink.error(exc);
}
});
});
}
A more thorough / comprehensive example of bridging the two APIs can be found here - there's almost certainly others around also.
Upvotes: 3