Reputation: 417
i'm new in Reactive programming and have a lot of questions. I think it is not a lack of examples or documentation it is just my understanding is wrong.
I'm trying to emulate slow subscriber;
Here is the code example
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(MILLIS);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next(it);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(MILLIS + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System out is
Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]
I thought if subscriber is slow, i should see more threads due to Schedulers.elastic()
Also i tried to make publishOn()
and it seems like i make it async, but still couldn't handle result in several threads.
Thanks for comments and answers.
Upvotes: 2
Views: 147
Reputation: 1712
If you want it to run in diferent threads you need to use .parallel() like this and the emit will be don in different thread
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(100);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next("a");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})
.parallel()
.runOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(100 + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
;
}
Upvotes: 1