Reputation: 11308
I have to poll some RESTful endpoint periodically to refresh my android app's data. I also have to pause and resume it based on connectivity (if the phone is offline, there's no need to even try). My current solution is working, but it uses standard Java's ScheduledExecutorService
to perform periodic tasks, but I'd like to stay in Rx paradigm.
Here's my current code, parts of which are skipped for brevity.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
// making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}
private void pause() {
for (ScheduledFuture<?> future : futures) {
future.cancel(true);
}
futures.clear();
}
});
final Subscription subscription = new Subscription() {
private boolean isUnsubscribed = false;
@Override
public void unsubscribe() {
scheduledExecutorService.shutdownNow();
isUnsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
};
subscriber.add(subscription);
}
}).multicast(BehaviorSubject.create()).refCount();
networkStatusObservable
is basically a broadcast receiver wrapped into Observable<Boolean>
, indicating that the phone is connected to the network.
As I said, this solution is working, but I want to use Rx approach for periodic polling and emitting new UserProfile
s, because there are numerous problems with scheduling things manually, which I want to avoid. I know about Observable.timer
and Observable.interval
, but can't figure out how to apply them to this task (and I'm not sure if I need to use those at all).
Upvotes: 23
Views: 31678
Reputation: 1015
Short answer. RxJava2:
Observable.interval(initialDelay, unitAmount, timeUnit)
.subscribe(value -> {
// code for periodic execution
});
Choose initialDelay, unitAmount and TimeUnit according to your needs.
Example: 0, 1, TimeUnit.MINUTES.
Upvotes: 5
Reputation: 2149
There is a simpler way to do it by using interval(). I have tested this code and it works. But first, you should encapsulate the job you want to periodically execute in a subclass of Action1.
class Act<T> implements Action1<T> {
public Service service;
public String data;
public void call(T t){
service.log(data); //the periodic job
}
}
(I have kept the fields public for brevity, but that isn't advisable). Now you can schedule it the following way:
Act<Long> act=new Act<>();
act.data="dummy data";
act.service=this;
Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);
This will not block your threads anywhere, unlike the approach given in the other answer. This approach allows us to pass a variable as a kind of mutable storage inside the Action which could be handy in subsequent invocations. Also, this way you could subscribe your call on your own thread pool.
Upvotes: 3
Reputation: 12477
There are a few approaches on this GitHub issue that you might find helpful.
https://github.com/ReactiveX/RxJava/issues/448
The three implementations are:
Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });
Scheduler.schedulePeriodically
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> o) {
return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
@Override
public Subscription call(Scheduler inner, Long t2) {
o.onNext("data-from-polling");
return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("output: " + v);
}
});
And the conclusion is that manual recursion is the way to go because it waits until the operation is completed before scheduling the next execution.
Upvotes: 29
Reputation: 11308
Okay, I'll post my own solution, maybe someone will benefit from it. I'll only post the part related to the question, omitting the HTTP and caching stuff. Here's how I do it:
private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
final Observable<Boolean> pauseResumeObservable) {
final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
new Func2<Boolean, Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable, Boolean mustPause) {
return mustPause && networkAvailable;
}
}
).distinctUntilChanged();
final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable) {
return networkAvailable;
}
});
final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable) {
return !networkAvailable;
}
});
return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
@Override
public Observable<Long> call(Boolean shouldResumeRequests) {
if (shouldResumeRequests) {
long timeToUpdate;
final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
return Observable
.timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
.takeUntil(hasToStopObservable);
} else {
Log.d(TAG, "Have to pause updates");
return Observable.<Long>never().takeUntil(hasToResumeObservable);
}
}
}).multicast(PublishSubject.<Long>create());
}
As you can see, the conditions to pause or resume updates become a bit more complicated, with a new Observable added to support pausing when app goes to background.
Then at the core of the solution is the concatMap
operation which emits the Observables
sequentially (hence concatMap, not flatMap, see this question: What is the difference between concatMap and flatMap in RxJava). It emits either interval
or never
Observables
, depending on whether updates should be continued or paused. Then every emitted Observable
is takenUntil
'an opposite' Observable
emits new value.
ConnectableObservable
is returned because the created Observable
is hot, and all the intended subscribers have to subscribe to it before it starts emitting something, otherwise initial events could be lost. I call connect
on it later.
I'll accept either my or another answer based on votes, if any.
Upvotes: 1
Reputation: 2504
One of options is to use Observable.interval and checking the user state when the intervals are emitted:
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
//pulling the user data
Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
Random random = new Random();
@Override
public Observable<String> call(Long tick) {
//here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
switch(random.nextInt(10)){
case 0://suppose this is for cases when network in not available or exception happens
return Observable.<String>just(null);
case 1:
case 2:
return Observable.just("Alice");
default:
return Observable.just("Bob");
}
}
});
Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Observable<String> stringObservable) {
return stringObservable;
}
});
//filter valid data
Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
});
//publish only changes
Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();
You can do it even simpler if your networkStatusObservable is emitting events at least as frequently as you need to check user data
networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)
Finally you can can create observable which uses scheduler to emit the user states periodically - refer to Schedulers documentation to learn which scheduler fit you needs the best:
public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
private final Scheduler scheduler;
private final long initialDelay;
private final long period;
private final TimeUnit unit;
public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
this.scheduler = scheduler;
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
}
abstract T next() throws Exception;
@Override
public void call(final Subscriber<? super T> subscriber) {
final Scheduler.Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedulePeriodically(new Action0() {
@Override
public void call() {
try {
subscriber.onNext(next());
} catch (Throwable e) {
try {
subscriber.onError(e);
} finally {
worker.unsubscribe();
}
}
}
}, initialDelay, period, unit);
}
}
//And here is the sample usage
Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
Random random = new Random();
@Override
String next() throws Exception {
//if you use Schedulers.io, you can call the remote service synchronously
switch(random.nextInt(10)){
case 0:
return null;
case 1:
case 2:
return "Alice";
default:
return "Bob";
}
}
});
Upvotes: 6