Reputation: 4216
I have a piece of code who's work is to update a local cache. There are two triggers to this cache update:
So here's a basic example on how I did this.
forceReloadEvents = new SerializedSubject<Long, Long>(PublishSubject.<Long> create());
dataUpdates = Observable
.merge(forceReloadEvents, Observable.timer(0, pullInterval, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData(); // operation that may take long
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
Then later i have
public void forceReload() {
final CountDownLatch cdl = new CountDownLatch(1);
dataUpdates
.take(1)
.subscribe(
new Action1<Boolean>() {
@Override
public void call(Boolean b) {
cdl.countDown();
}
}
);
forceReloadEvents.onNext(-1L);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
This works but the problem is when I start to have multiple concurrent calls to forceReload()
: There will be no concurrent execution of reloadData()
but the elements will queue up and the process will loop on reloading data until all the events sent to forceReloadEvents
have been consumed even though forceReload()
already completed due to previous events releasing the CountDownLatch
.
I wanted to use onBackPressureDrop
but it seems there's no induced backpressure and nothing is dropped. What I'd like is some way to force backpressure so that the merge understands that only one element can be processed at a time and that any subsequent event must be dropped until the current execution is done.
I thought about using buffer
or throttleFirst
too but I don't want to force a specific time between each event and i'd rather have this auto-scaling depending on the time it takes to reload the cache. You can think of it as throttleFirst
until reloadData
has completed.
Upvotes: 1
Views: 564
Reputation: 4216
I made this work thanks to akarnokd!
Here's the solution I created based on his answer:
Observable<Long> forceReloadEvents = this.forceReloadEvents
.asObservable()
.onBackpressureDrop();
Observable<Long> periodicReload = Observable
.timer(0, pullInterval, TimeUnit.SECONDS)
.onBackpressureDrop();
final AtomicBoolean running = new AtomicBoolean();
dataUpdates = Observable
.merge(forceReloadEvents, periodicReload)
.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long t) {
return running.compareAndSet(false, true);
}
})
.observeOn(Schedulers.io())
.flatMap(new Func1<Long, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Long t) {
return reloadData();
}
})
.doOnNext(new Action1<Boolean>() {
@Override
public void call(Boolean t) {
running.set(false);
}
})
.publish();
dataUpdates.subscribe();
dataUpdates.connect();
I'm not sure onBackpressureDrop
is usefull here but I set it as a precaution.
The forceReload code does not change.
Upvotes: 2
Reputation: 69997
Edit: based on the comments, you can have an AtomicBoolean as a gate in the flatMap to not start a reload until the gate is open again:
public class AvoidReloadStorm {
static Observable<Boolean> reload() {
return Observable.just(true)
.doOnNext(v -> System.out.println("Reload started..."))
.delay(10, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Reloaded"));
}
public static void main(String[] args) throws Exception {
Subject<Long, Long> manual = PublishSubject.<Long>create().toSerialized();
Observable<Long> timer = Observable.timer(0, 5, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Timer reload"));
AtomicBoolean running = new AtomicBoolean();
ConnectableObservable<Boolean> src = Observable
.merge(manual.onBackpressureDrop(), timer.onBackpressureDrop())
.observeOn(Schedulers.io())
.flatMap(v -> {
if (running.compareAndSet(false, true)) {
return reload().doOnCompleted(() -> {
running.set(false);
});
}
System.out.println("Reload rejected");
return Observable.empty();
}).publish();
src.subscribe(System.out::println);
src.connect();
Thread.sleep(100000);
}
}
Upvotes: 2