moksie
moksie

Reputation: 1

What is a good way to buffer elements using project-reactor where the boundary is the size in bytes not the number of elements?

I have a use case to implement where I'll receive a continuous stream of key/value pairs. I need to group and accumulate the values by the key until the size of the accumulation in bytes reaches a predefined threshold. When the threshold is met for a key, it should emit the buffer and restart buffering. Along with this size based boundary condition it should also consider a timeout. When the timeout elapses, it should emit the batch and restart accumulating.

Using one of the bufferXXX or windowXXX methods in Flux would potentially solve the problem. But these implementations only consider the number of elements being queued to test the boundary condition. Another option would be to use predicate to define custom boundary. In that case it would be difficult to apply the timeout condition. Also, this implementation uses lock based synchronization which I would like to avoid.

Is there any other good way to achieve this?

Here is a simplified version of the workflow I am thinking of implementing.

Initialization of a unbounded flux

    private void initializePipeline() {
        bridge  = new Bridge<>();
        Flux.<Pair>create(sink -> bridge.sink = sink)
                .groupBy(Pair::key, Pair::value)
                .flatMap(gfp -> gfp.bufferUntil(new Boundary(), true) //
                        .doOnNext(c -> process(c)))
                .subscribe();
    }

    public record Pair(String key, String value){};

    private static class Bridge<T> {
        private FluxSink<T> sink;

        void emit(T value) {sink.next(value);}
        void error(Throwable error) {sink.error(error);}
        void complete() {sink.complete();}
    }

To Manage and utilize the pipeline.

public void start() {
    initializePipeline();
}

public void batch(String key, String value) {
    bridge.emit(new Pair(key, value));
}

public void end() {
    bridge.complete();
}

Upvotes: 0

Views: 25

Answers (0)

Related Questions