Abdullah Bashir
Abdullah Bashir

Reputation: 53

RxJava Combining Multiple Observer after filter

Following is my Current Code

  private final List<Disposable> subscriptions = new ArrayList<>();

  for (Instrument instrument : instruments) {
    // Waiting for OrderBook to generate Reliable results.
    GenericBook Book =
        service
            .getBook(instrument.getData())
            .filter(gob -> onBookUpdate(gob))
            .blockingFirst();

    subscriptions.add(
        service
            .getBook(instrument.getData())
            .subscribe(
                gob -> {
                  try {
                    onBookUpdate(gob);
                  } catch (Exception e) {
                    logger.error("Error on subscription:", e);
                  }
                },
                e -> logger.error("Error on subscription:", e)));
  }

So what it does is for each instrument it first Block wait till the output of onBookUpdate(gob) Becomes true. onBookUpdate(gob) returns boolean. Once we have first onBookUpdate as true then i Will push that subscriber into subscriptions variable.

This slow down as I have to wait foreach instrument and then move on the next instrument.

My Goal is to run all these in parallel then wait all to finish and push them to subscriptions variable.

I tried zip but didn't work

  List<Observable<GenericOrderBook>> obsList = null;
  for (Instrument instrument : instruments) {
    // This throws nullException.
   obsList.add(service
            .getBook(instrument.getData())
            .filter(gob -> onBookUpdate(gob))
            .take(1));
    }
  }
// Some how wait over here until all get first onBookUpdate as true.
String o = Observable.zip(obsList, (i) -> i[0]).blockingLast();

Upvotes: 1

Views: 488

Answers (2)

arungiri_10
arungiri_10

Reputation: 988

I am assuming instruments to be a List. If yes, then you can do something like this,

Observable
    .fromIterable(instruments)
    // Returns item from instrument list one by one and passes it to getBook()
    .flatmap(
        instrument -> getBook(instrument.getData())
    )
    .filter(
        gob -> onBookUpdate(gob)
    )
    // onComplete will be called if no items from filter 
    .switchIfEmpty(Observable.empty())
    .subscribe(
        onBookUpdateResponse -> // Do what you want,
        error -> new Throwable(error)
    );

Hope this helps.

Upvotes: 0

Michael Wiles
Michael Wiles

Reputation: 21186

When using observables etc, one should embrace them wholeheartedly. One of the premises for embracing is to separate the configuration and construction of your pipeline from its execution.

In other words, configure your pipeline upfront and then, when the data is available, send the data through it.

Furthermore, embracing observables implies avoiding for-loops.

I'm not 100% what your use case is but what I'd suggest is to create a pipeline that takes an instrument as input and returns a subscription...

So something like

service.getBook(instrument.getData())
 .flatMap(gob -> {
   onBookUpdate(gob);
   return gob;
});

That will return an Observable that you can subscribe to and add the result to the subscriptions.

Then create a seed observable that pumps the instrument objects into it.

Not sure of some of the details of your API, so come back to me if this is not clear or I've made a wrong assumption.

Upvotes: 2

Related Questions