IsaacLevon
IsaacLevon

Reputation: 2570

Project Reactor: How to control Flux emission

I have a flux that emits some Date. This Date is mapped to 1024 simulated HTTP requests that I'm running on some Executer.

What I'd like to do is waiting for all the 1024 HTTP requests before emitting the next Date.

Currently when running, onNext() is called for many times and then it is stabilised on some steady rate.

How can I change this behaviour?

P.S. I'm willing to change to architecture, if needed.

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

HTTP request simulation:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}

EDIT: adapted code from answer:

Upvotes: 2

Views: 3766

Answers (1)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9937

You should take a look at these methods:

concatMap ensures that the elements on the flux are processed sequentially inside the operator:

Generation of inners and subscription: this operator waits for one inner to complete before generating the next one and subscribing to it.

flatMap lets you do the same by exposing concurrency and prefetch parameters which provide you more control over this behavior:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel. In turn, that argument shows the size of the first Subscription.request(long) to the upstream. The prefetch argument allows to give an arbitrary prefetch size to the merged Publisher (in other words prefetch size means the size of the first Subscription.request(long) to the merged Publisher).

Upvotes: 5

Related Questions