viq
viq

Reputation: 417

Reactor 3 Emitter/Subscriber paralllel

i'm new in Reactive programming and have a lot of questions. I think it is not a lack of examples or documentation it is just my understanding is wrong.

I'm trying to emulate slow subscriber;

Here is the code example

Flux.create(sink -> {
    int i = 0;
    while (true) {
        try {
            System.out.println("Sleep for " + MILLIS);
            Thread.sleep(MILLIS);
            int it = i++;
            System.out.println("Back to work, iterator " + it);
            sink.next(it);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
    try {
        System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
        Thread.sleep(MILLIS + 4000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

System out is

Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]

I thought if subscriber is slow, i should see more threads due to Schedulers.elastic()

Also i tried to make publishOn() and it seems like i make it async, but still couldn't handle result in several threads.

Thanks for comments and answers.

Upvotes: 2

Views: 147

Answers (1)

Ricard Kollcaku
Ricard Kollcaku

Reputation: 1712

If you want it to run in diferent threads you need to use .parallel() like this and the emit will be don in different thread

Flux.create(sink -> {
        int i = 0;
        while (true) {
            try {
                System.out.println("Sleep for " + MILLIS);
                Thread.sleep(100);
                int it = i++;
                System.out.println("Back to work, iterator " + it);
                sink.next("a");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    })

            .parallel()
            .runOn(Schedulers.elastic())

            .subscribe(x -> {
                try {
                    System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
                    Thread.sleep(100 + 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
    ;
}

Upvotes: 1

Related Questions