tbo
tbo

Reputation: 9738

RxJava 2 Observable that onComplete resubmits itself

I'm new with RxJava. I'm trying to create an observable that when it completes it will start all over again until I call dispose, but I'm facing an OutofMemory error after a while, below is a simplified example of what I'm trying to do

  public void start() throws RuntimeException {
        log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());

        try {

            executeObserve();

            log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
        } catch (Exception e) {
            log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
        }
    }

start is invoked at the initialization phase once, the executeObserve is as follows (in a simplified form..). Notice that on the onComplete I "resubmit" executeObserve

public void executeObserve() throws RuntimeException {

        Observable<Book> booksObserve
                = manager.getAsObservable();

        booksObserve
                 .map(Book::getAllOrders)
                 .flatMap(Observable::fromIterable)
                 .toList()
                 .subscribeOn(Schedulers.io())
                 .subscribe(collectedISBN ->  
                      Observable.fromIterable(collectedISBN)
                       .buffer(10)
                       // ...some more steps here...
                       .toList()
                       .toObservable()
                       // resubmit
                      .doOnComplete(this::executeObserve)
                      .subscribe(validISBN -> {
                             // do something with the valid ones
                      })
             )
        );
    }

My guess is that this is not the way to go if I want to resubmit my tasks but it was not possible to find any documentation.

the booksObserve is implemented as follows

public Observable<Book> getAsObservable() {
    return Observable.create(e -> {
        try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
            for (Book r : rs) {
                e.onNext(r);
            }
            e.onComplete();
        } catch (Exception ex) {
            e.onError(ex);
        }
    });
}

What is the correct way to constantly resubmit an operation until we call dispose or equivalent? I'm using RxJava 2

Upvotes: 0

Views: 696

Answers (2)

tbo
tbo

Reputation: 9738

In continuation to my question the repeat as @yosriz suggested is the proper way to go, the following simple snippet demonstrates that the observable source will be called on each repeat

Observable<Integer> recursiveObservable = Observable.create(emitter -> {
            System.out.println("Calling to emit data");
            Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0).forEach(emitter::onNext);
            emitter.onComplete();
        });
        recursiveObservable
                .buffer(2)
                .repeat()
                .subscribe(integers -> {
                    System.out.println(integers);
                    TimeUnit.SECONDS.sleep(1);
                });

Upvotes: 0

yosriz
yosriz

Reputation: 10267

You have created an endless recursion, the loop will create more and more resources and sometime it will blow with OutOfMemory/Stack overflow exception.

In order to repeat the Observable work you should use repeat() operator, it will resubscribes to the Observable when it receives onComplete().

Besides that, some general comments on your code:

  • why are you nesting the second Observable inside the subscriber? you are breaking the chain, you can just continue the chain instead of creating new Observable at the Subscriber.
  • Moreover, it's seems (assuming Observable.fromIterable(collectedBets) using the collectedISBN that gets with the onNext() o.w. from where does it comes?) you're collecting all items to a list, and then flatting it again using from iterable, so it's seems you can just continue on the stream , something like that:

    booksObserve
       .map(Book::getAllOrders)
       .flatMap(Observable::fromIterable)
       .buffer(10)
       // ...some more steps here...
       .toList()
       .toObservable()
       // resubmit
       .doOnComplete(this::executeObserve)
       .subscribeOn(Schedulers.io())
       .subscribe(validISBN -> {
             // do something with the valid ones
        });       
    
  • Anyhow, with the nested Observable, the repeat() operator will just repeat the nested one, and not the entire stream (which is what you want) as it is not connected to it.

Upvotes: 2

Related Questions