Reputation: 425
Here is a picture of what I am attempting to accomplish.
--a-b-c-a--bbb--a
split into
--a-----a-------a --> a stream
----b------bbb--- --> b stream
------c---------- --> c stream
Then, be able to
a.subscribe()
b.subscribe()
c.subscribe()
So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.
The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?
Upvotes: 40
Views: 33870
Reputation: 498
In RxJava there is a special version of publish operator that takes a function.
ObservableTransformer {
it.publish { shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
This splits the event stream by type. Then you can process them separately by composing with different transformers. All of them share single subscription.
Upvotes: 1
Reputation: 1344
You don't have to collapse Observables
from groupBy
. You can instead subscribe to them.
Something like this:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
If statements look kinda ugly but at least you can handle each stream separately. Maybe there is a way of avoiding them.
Upvotes: 18
Reputation: 140
I have been thinking about this and Tomas solution is OK, but the issue is that it converts the stream to a hot observable.
You can use share
in combination with defer
in order to get a cold observable with other streams.
For example (Java):
var originalObservable = ...; // some source
var coldObservable = Observable.defer(() -> {
var shared - originalObservable.share();
var aSource = shared.filter(x -> x.equals("a"));
var bSource = shared.filter(x -> x.equals("b"));
var cSource = shared.filter(x -> x.equals("c"));
// some logic for sources
return shared;
});
Upvotes: 1
Reputation: 1520
Easy as pie, just use filter
An example in scala
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
You just need to make sure that the source observable is hot. The easiest way is to share
it.
Upvotes: 43