Reputation: 1
I have a use case to implement where I'll receive a continuous stream of key/value pairs. I need to group and accumulate the values by the key until the size of the accumulation in bytes reaches a predefined threshold. When the threshold is met for a key, it should emit the buffer and restart buffering. Along with this size based boundary condition it should also consider a timeout. When the timeout elapses, it should emit the batch and restart accumulating.
Using one of the bufferXXX or windowXXX methods in Flux would potentially solve the problem. But these implementations only consider the number of elements being queued to test the boundary condition. Another option would be to use predicate to define custom boundary. In that case it would be difficult to apply the timeout condition. Also, this implementation uses lock based synchronization which I would like to avoid.
Is there any other good way to achieve this?
Here is a simplified version of the workflow I am thinking of implementing.
Initialization of a unbounded flux
private void initializePipeline() {
bridge = new Bridge<>();
Flux.<Pair>create(sink -> bridge.sink = sink)
.groupBy(Pair::key, Pair::value)
.flatMap(gfp -> gfp.bufferUntil(new Boundary(), true) //
.doOnNext(c -> process(c)))
.subscribe();
}
public record Pair(String key, String value){};
private static class Bridge<T> {
private FluxSink<T> sink;
void emit(T value) {sink.next(value);}
void error(Throwable error) {sink.error(error);}
void complete() {sink.complete();}
}
To Manage and utilize the pipeline.
public void start() {
initializePipeline();
}
public void batch(String key, String value) {
bridge.emit(new Pair(key, value));
}
public void end() {
bridge.complete();
}
Upvotes: 0
Views: 25