Jeff
Jeff

Reputation: 933

Making an RxJava Operator Chain Concurrent

I've just started using RxJava. I've been attempting to build a data pipeline that downloads large amount of data from different sources and inserts the data into a database in a concurrent way.

My basic pipeline form looks like the following:

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

Whenever I call observeOn instead of running and printing out all the numbers that the above printed out, instead nothing prints out. Why is this? I would expect that the next concatMap, and subscribe would just use the computation scheduler as well. Code is posted below.

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .observeOn(Schedulers.computation())
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

Upvotes: 1

Views: 1003

Answers (2)

rafaelportela
rafaelportela

Reputation: 306

After reading your comments to get more context, I'm assuming you want to make a request to get a list of ids (items summary), then make one additional request per returned id to get each item's detail, and then do something with the results (the items).

Considering you already have methods to build and fire requests, returning an Observable. You can wrap the http related code and defer an Observable out of it, or create an Observable from a Future, in case your http library returns Futures.

// you have this available
Observable<Summary> = querySummary(summaryId);
Observable<Item> = queryItem(itemId);

I didn't run any of the following code, so take it as a guidance. You could start with:

Observable<Item> itemsObservable = querySummary(summaryId)
    .flatMap(summary -> {
        return Observable.from(summary.getItemsIdsList());
    })
    .map(itemId -> {
        return queryItem(itemId);
    });

You are querying the Summary, then using the flatMap operator to emit individual returned ids. Then you can map each id to a request. At the end you could subscribe to itemsObservable to let the individual Item objects flow.

If you want to save the Items in the database, before subscribing, you could plug an doOnNext at the tail of the last map and save each Item. If you want to sum a value from each Item, or do any aggregation, you can use reduce. And so on.. Following the way it is, with a single .subscribe() at the end, this code runs all in the main thread (in case your http library doesn't have any fancy thread pool feature).

itemsObservable.doOnNext(item -> {
    // do something in whatever thread it might be running
}).subscribe();

If you want to fire the summary request and wait for it asynchronously, then fire the items' requests also concurrently as new item ids are arriving, you can add .observeOn(Schedulers.io()) as you did.

Observable<Item> itemsObservable = querySummary(summaryId)
    .observeOn(Schedulers.io()) // from here, continue in the background
    .flatMap(summary -> { // ...

Notice that you are telling the Observable chain to follow the execution in another thread and forgetting about it. In same cases, this is just what you want. But if you are running this from a plain public static void main() method as your program body, your program will shutdown before the end of the background execution, and that's expected. If you were dealing directly with references to thread objects, you should be responsible to join() the threads by your own, making the main program wait for them.

Since reactive streams are meant to hide the threading complexity, you just need to "say" you want to get all together at the end of the party. In this case, to bring the execution back to the main thread, you can plug a .toBlocking() or .toList(). Then you can safely get the results at the onNext of your subscription in your current thread.

itemsObservable
    .toBlocking()
    .subscribe(item -> {
        // blocking the main thread
        // do something on each item 
});

or..

itemsObservable
    .toList()
    .subscribe(itemsList -> {
        // blocking the main thread
        // do something with the whole list at once
});

There are lots of different ways you can compose Observables, creating one or more streams, merging them, running them asynchronously or not. So, it's up to your needs.

I hope it helps!

Upvotes: 0

Reut Sharabani
Reut Sharabani

Reputation: 31339

This is a guess since you did not supply context, but you have to block if you're changing threads since the main is not blocked and you're possibly terminating before the other scheduler had a chance to run:

Observable.range(1, 5)
        .concatMap((i) -> {
            return Observable.range(i, 2);
        })
        .observeOn(Schedulers.computation())
        .concatMap((i) -> {
            return Observable.range(i, 2);
        })
        .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
// block to let other schedulers finish
Thread.sleep(3000);

Upvotes: 2

Related Questions