Hendrik Jander
Hendrik Jander

Reputation: 5715

RxJava Code Execution Flow - map vs. flatMap

I am having a hard time trying to learn RxJava and Reactive programming while writing a Web crawler with JerseyRx and RxMongo driver. My code looks as follows:

    Observable.interval( 200, TimeUnit.MILLISECONDS ).map(tick -> links(linksCollection)
            .map(linkDoc -> httpGet(client, linkDoc.getString("url"))
                    .map(htmlDoc -> parseLinks(htmlDoc)).subscribe() )
            .subscribe())
    .subscribe();

The signatures of links() and httpGet() are as follows:

Observable<Document> links(MongoCollection<Document> linkCollection)
Observable<String> httpGet(RxClient<RxObservableInvoker> client, String url)
List<HtmlLink> parseLinks(final String html)

Suppose the methods are printing their names when they are called, then the output looks as follows:

Links Get Links Parse Links Links Links Links

Get and Parse are only called one time. Can somebody please explain why the program flow is like that and how to solve that issue.

Objects that are not declared in that question are only left out for brevity

Upvotes: 0

Views: 367

Answers (1)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

When you want to trigger an asynchronous sub-process (like fetching the http document for a link), you should use flatMap. It will flatten the sub-Observable and emit its items as another sequential "burst" of emissions in the output Observable:

Observable.interval(200, TimeUnit.MILLISECONDS)
    //this is a continuous flow of n documents repeated every 200ms:
    .flatMap(tick -> links(linksCollection))
    //this is a continous Observable<String> with n Strings every 200ms
    .flatMap(linkDoc -> httpGet(client, linkDoc.getString("url"))
    //here you have a flow of httpDocuments, get the list of links for each one
    .map(htmlDoc -> parseLinks(htmlDoc);

Affecting this chain to a variable yield an Observable<List<HtmlLink>> to which you can subscribe.

Upvotes: 2

Related Questions