Reputation: 575
I have an Observable that represents a sequence selected from DB table, so it is finite.
Observable<Item> selectResults() { ... }
I would like to implement a pulling with a specified interval, so at the end I will end up with another observable that will wrap my original one and pull indefinitely.
I just don't know how to do it :(
Ok, this is my idea what to do, modeled around interval observable, probably needs error handling and unsubscribe logic.
public class OnSubscribePeriodicObservable implements OnSubscribe<Item> {
...
@Override
public void call(final Subscriber<? super Item> subscriber) {
final Worker worker = scheduler.createWorker();
subscriber.add( worker );
worker.schedulePeriodically(new Action0() {
@Override
public void call() {
selectResults().subscribe( new Observer<Item>() {
@Override
public void onCompleted() {
//continue
}
@Override
public void onError(Throwable e) {
subscriber.onError( e );
}
@Override
public void onNext(Item t) {
subscriber.onNext( t );
}
});
}
}, initialDelay, period, unit);
}
Upvotes: 2
Views: 173
Reputation: 69997
You can accomplish this with standard operators which will give you error propagation, unsubscription and backpressure for cheap:
Observable<Integer> databaseQuery = Observable
.just(1, 2, 3, 4)
.delay(500, TimeUnit.MILLISECONDS);
Observable<Integer> result = Observable
.timer(1, 2, TimeUnit.SECONDS)
.onBackpressureDrop()
.concatMap(t -> databaseQuery);
result.subscribe(System.out::println);
Thread.sleep(10000);
Upvotes: 1