Lucas Ferraz
Lucas Ferraz

Reputation: 4152

RxJava filter and emit other items

Its possible to filter and continue emiting itens like below ?


My code that calls subscriber 2 times:

Observable<Map.Entry<String, ArrayList<MockOverview>>> requestEntries =
        this.requestView.request(request)
        .map(HashMap::entrySet)
        .flatMapIterable(entries -> entries);

requestEntries.filter(entry -> entry.getKey().equals("featured"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {
            Log.i("subscrive", "featured");
        });

requestEntries.filter(entry -> entry.getKey().equals("done"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {
            Log.i("subscrive", "featured");
        });

What i want:

 requestEntries.filter(entry -> entry.getKey().equals("featured"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {

        })
        .filter(entry -> entry.getKey().equals("done"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {

        });

Upvotes: 2

Views: 1113

Answers (2)

RvanHeest
RvanHeest

Reputation: 869

By the looks of things your second version is not equal to your first: the former looks at the requestEntries stream twice, filters on featured and done keys respectively and does its own things with it. Your second version however first filters on featured first then does some transformations and side-effects and then filter out the done. However, that Observable<entryset> is not at all in scope in that second filter lambda.

What you need to do here is use publish(<lambda>) on requestEntries and in the lambda do the stuff from your first version, use onNext instead of subscribe, merge the streams and return that combined stream. Then outside of the publish you subscribe once (and do nothing in there) or go on and use the result of your stream somewhere else.

requestEntries.publish(re -> {
    Observable<...> x = re.filter(...<featured>...).map(...).doOnNext(...Log.i(...));
    Observable<...> y = re.filter(...<done>...).map(...).doOnNext(...Log.i(...));
    return x.mergeWith(y);
})

Upvotes: 3

akarnokd
akarnokd

Reputation: 69997

You can use doOnNext in the place of the first subscribe()

 requestEntry.filter(v -> ...)
 .map(v -> ...)
 .doOnNext(v -> ...)
 .filter(v -> ...)
 .map(v -> ...)
 .subscribe(...)

or use publish(Func1):

 requestEntry.filter(v -> ...)
 .map(v -> ...)
 .publish(o -> {
     o.subscribe(...);
     return o;
 })
 .filter(v -> ...)
 .map(v -> ...)
 .subscribe(...)

Upvotes: 2

Related Questions