Reputation: 1145
Well, backpressure in RxJava is not real backpressure, but only ignoring some sets of elements.
But what if I cannot loose any elements and I need to slow emition somehow?
RxJava cannot affect element emition, so developer needs to implement it by himself. But how?
The simpliest way comes to mind is to use some counter with incrementing on emition and decrementing on finishing.
Like that:
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (!s.isUnsubscribed()) {
if (counter.get() < 100) {
s.onNext(Math.random());
counter.incrementAndGet();
} else {
sleep(100);
}
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(x -> sleep(1000))
.doOnNext(x -> counter.decrementAndGet())
)
.subscribe();
}
But I think this way is very poor. Is there any better solutions?
Upvotes: 2
Views: 1185
Reputation: 69997
Well, backpressure in RxJava is not real backpressure
RxJava's backpressure implementation is a non-blocking cooperation between subsequent producers and consumers through a request channel. The consumer asks for some amount of elements via request()
and the producers creates/generates/emits at most that amount of items via onNext
, sometimes with delays between onNext
s.
but only ignoring some sets of elements.
This happens only when you explicitly tell RxJava to drop any overflow.
RxJava cannot affect element emition, so developer needs to implement it by himself. But how?
Using Observable.create
requires advanced knowledge of how non-blocking backpressure can be implemented and practically it is not recommended to library users. RxJava has plenty of ways to give you backpressure-enabled flows without complications:
Observable.range(1, 100)
.map(v -> Math.random())
.subscribeOn(sA)
.flatMap(v ->
Observable.just(v).subscribeOn(sB)
.doOnNext(x -> sleep(1000))
)
.subscribe();
or
Observable.create(SyncOnSubscribe.createStateless(
o -> o.onNext(Math.random())
)
.subscribeOn(sA)
...
Upvotes: 3
Reputation: 17711
As you noted yourself, this actually has nothing to do with RxJava.
If you must process all events eventually, but you want to do that at your own pace, use queues:
ExecutorService emiter = Executors.newSingleThreadExecutor();
ScheduledExecutorService workers = Executors.newScheduledThreadPool(4);
BlockingQueue<String> events = new LinkedBlockingQueue<>();
emiter.submit(() -> {
System.out.println("I'll send 100 events as fast as I can");
for (int i = 0; i < 100; i++) {
try {
events.put(UUID.randomUUID().toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
workers.scheduleWithFixedDelay(
() -> {
String result = null;
try {
result = events.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("I don't care, got %s only now", result));
}, 0, 1, TimeUnit.SECONDS
);
Upvotes: -1