Reputation: 20384
I have an Observable that emits "expensive" values to fetch (think slow network) and I need to process the individual items as well as some aggregates. I'd like to get away with a single I want the Observable to emit its values only once for all subscribers.subscribe(...)
.
I created an example to demo where I'm stuck. In the example I use a collection as source of data - don't get distracted by that. Running through a collection multiple times is "cheap" - but that's not my use case, so don't zoom into the use of collection here ;-).
I have a simple class:
public class Lego {
public final String color;
public final String shape;
public final String size;
public final int nippels;
public Lego(final String color, final String shape, final String size, final int nippels) {
this.color = color;
this.shape = shape;
this.size = size;
this.nippels = nippels;
}
@Override
public String toString() {
return String.format("Color: %s, Shape: %s, Size: %s, Nipples: %d",this.color,this.shape,this.size,this.nippels);
}
}
And a collection of these objects:
public Observable<Lego> getLegoSample() {
Collection<Lego> result = new ArrayList<Lego>();
result.add(new Lego("red", "cube", "xl", 6));
result.add(new Lego("blue", "box", "s", 2));
result.add(new Lego("green", "cube", "l", 4));
result.add(new Lego("yellow", "odd", "m", 3));
result.add(new Lego("red", "sphere", "xs", 0));
result.add(new Lego("blue", "box", "xl", 8));
result.add(new Lego("green", "odd", "s", 1));
result.add(new Lego("green", "odd", "m", 1));
result.add(new Lego("green", "odd", "l", 1));
result.add(new Lego("yellow", "odd", "m", 3));
result.add(new Lego("blue", "box", "m", 4));
result.add(new Lego("green", "cube", "l", 4));
result.add(new Lego("yellow", "sphere", "m", 3));
result.add(new Lego("red", "sphere", "xl", 7));
result.add(new Lego("blue", "cube", "xl", 8));
result.add(new Lego("green", "odd", "s", 1));
result.add(new Lego("yellow", "box", "s", 1));
result.add(new Lego("yellow", "sphere", "l", 1));
Observable<Lego> observable = Observable.from(result);
return observable;
}
to get started (Don't get hung up on the Collection nature, it's just the sample). The output that I need is:
I can produce - more or less - these outputs separately (not sure of #2):
getLegoSample().subscribe();
getLegoSample().count().subscribe();
getLegoSample().map(LegotoNipples).sum().subscibe();
for 2. I used collect()
- is that the right way?
getLegoSample().collect(new HashMap<String,Integer>(), new Action2<Map<String,Integer>,Lego>(){
@Override
public void call(Map<String, Integer> t1, Lego t2) {
if (t1.containsKey(t2.color)) {
t1.put(t2.color,(t1.get(t2.color).intValue()+1));
} else {
t1.put(t2.color, 1);
}
}}).subscribe();
2 Questions:
Update (4Sep):
Reading the huge documentation helps (my bad). I was looking literally at the wrong end. To get the Observable to emit only once for all potential subscribers, it needs to be a Connectable Observable, a subclass of Observable. This can be achieved using publish()
on any Observable.
A Connectable Observable does not emit any values until its connect()
method is called.
So my code would look like this:
ConnectableObservable co = getLegoSample().publish();
co.subscribe();
co.collect(new HashMap<String,Integer>(), new Action2<Map<String,Integer>,Lego>(){
@Override
public void call(Map<String, Integer> t1, Lego t2) {
if (t1.containsKey(t2.color)) {
t1.put(t2.color,(t1.get(t2.color).intValue()+1));
} else {
t1.put(t2.color, 1);
}
}}).subscribe();
co.count().subscribe();
co.map(LegotoNipples).sum().subscribe();
co.connect();
Remains the question on collect()
for aggregation and eventually, when using different schedulers, how to best orchestrate (but there are answers around for that).
(And yes, I'm stuck with Java < 8). Gist of my tests
Upvotes: 1
Views: 593
Reputation: 1960
Also you can use Observable.groupBy
for aggregation. Consider this sample:
Observable<NamedCount<String>> countStream =
getLegoSample()
.groupBy(Lego::getColor)
.flatMap(obs -> obs.count().map(count -> new NamedCount<>(obs.getKey(), count)))
With helper
public class NamedCount<T> {
private final T name;
private final int count;
public NamedCount(T name, int count) {
this.name = name;
this.count = count;
}
public T getName() {
return name;
}
public int getCount() {
return count;
}
}
Subscribing to it
countStream.subscribe(System.out::println);
produces following output
NamedCount{name=red, count=3}
NamedCount{name=green, count=6}
NamedCount{name=blue, count=4}
NamedCount{name=yellow, count=5}
Changing thr function passed to Observable.groupBy()
one can get other aggregations (count by shape, size etc)
Upvotes: 1