user2649908
user2649908

Reputation: 575

Infinite observable from another observable

I have an Observable that represents a sequence selected from DB table, so it is finite.

Observable<Item> selectResults() { ... }

I would like to implement a pulling with a specified interval, so at the end I will end up with another observable that will wrap my original one and pull indefinitely.

I just don't know how to do it :(


Ok, this is my idea what to do, modeled around interval observable, probably needs error handling and unsubscribe logic.

public class OnSubscribePeriodicObservable implements OnSubscribe<Item> {
...

  @Override
  public void call(final Subscriber<? super Item> subscriber) {
      final Worker worker = scheduler.createWorker();
      subscriber.add( worker );

      worker.schedulePeriodically(new Action0() {
          @Override
          public void call() {
    selectResults().subscribe( new Observer<Item>() {
            @Override
            public void onCompleted() {
              //continue
            }

            @Override
            public void onError(Throwable e) {
              subscriber.onError( e );
            }

            @Override
            public void onNext(Item t) {
              subscriber.onNext( t );
            }
         }); 
          }

      }, initialDelay, period, unit);
}

Upvotes: 2

Views: 173

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You can accomplish this with standard operators which will give you error propagation, unsubscription and backpressure for cheap:

Observable<Integer> databaseQuery = Observable
    .just(1, 2, 3, 4)
    .delay(500, TimeUnit.MILLISECONDS);

Observable<Integer> result = Observable
        .timer(1, 2, TimeUnit.SECONDS)
        .onBackpressureDrop()
        .concatMap(t -> databaseQuery);

result.subscribe(System.out::println);

Thread.sleep(10000);

Upvotes: 1

Related Questions