Brandon Bil
Brandon Bil

Reputation: 425

Split Rx Observable into multiple streams and process individually

Here is a picture of what I am attempting to accomplish.

--a-b-c-a--bbb--a

split into

--a-----a-------a --> a stream

----b------bbb--- --> b stream

------c---------- --> c stream

Then, be able to

a.subscribe()
b.subscribe()
c.subscribe()

So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.

The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?

Upvotes: 40

Views: 33870

Answers (4)

darekxan
darekxan

Reputation: 498

In RxJava there is a special version of publish operator that takes a function.

ObservableTransformer {
  it.publish { shared ->
    Observable.merge(
        shared.ofType(x).compose(transformerherex),
        shared.ofType(y).compose(transformerherey)
    )
  }
}

This splits the event stream by type. Then you can process them separately by composing with different transformers. All of them share single subscription.

Upvotes: 1

ihuk
ihuk

Reputation: 1344

You don't have to collapse Observables from groupBy. You can instead subscribe to them.

Something like this:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};

Action1<String> a = s -> System.out.print("-a-");

Action1<String> b = s -> System.out.print("-b-");

Action1<String> c = s -> System.out.print("-c-");

Observable
    .from(inputs)
    .groupBy(s -> s)
    .subscribe((g) -> {
        if ("a".equals(g.getKey())) {
            g.subscribe(a);
        }

        if ("b".equals(g.getKey())) {
            g.subscribe(b);
        }

        if ("c".equals(g.getKey())) {
            g.subscribe(c);
        }
    });

If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.

Upvotes: 18

JC Olivares
JC Olivares

Reputation: 140

I have been thinking about this and Tomas solution is OK, but the issue is that it converts the stream to a hot observable.

You can use share in combination with defer in order to get a cold observable with other streams.

For example (Java):

var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
    var shared - originalObservable.share();
    var aSource = shared.filter(x -> x.equals("a"));
    var bSource = shared.filter(x -> x.equals("b"));
    var cSource = shared.filter(x -> x.equals("c"));
    // some logic for sources
    return shared;
});

Upvotes: 1

Tom&#225;š Dvoř&#225;k
Tom&#225;š Dvoř&#225;k

Reputation: 1520

Easy as pie, just use filter

An example in scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

You just need to make sure that the source observable is hot. The easiest way is to share it.

Upvotes: 43

Related Questions