stwissel
stwissel

Reputation: 20384

How to return aggregates in RxJava - is collect() the right approach?

I have an Observable that emits "expensive" values to fetch (think slow network) and I need to process the individual items as well as some aggregates. I'd like to get away with a single subscribe(...). I want the Observable to emit its values only once for all subscribers.

I created an example to demo where I'm stuck. In the example I use a collection as source of data - don't get distracted by that. Running through a collection multiple times is "cheap" - but that's not my use case, so don't zoom into the use of collection here ;-).

I have a simple class:

public class Lego {

  public final String color;
  public final String shape;
  public final String size;
  public final int nippels;

  public Lego(final String color, final String shape, final String size, final int nippels) {
    this.color = color;
    this.shape = shape;
    this.size = size;
    this.nippels = nippels;
  }

  @Override
  public String toString() {
    return String.format("Color: %s, Shape: %s, Size: %s, Nipples: %d",this.color,this.shape,this.size,this.nippels);
  }

}

And a collection of these objects:

  public Observable<Lego> getLegoSample() {
    Collection<Lego> result = new ArrayList<Lego>();
    result.add(new Lego("red", "cube", "xl", 6));
    result.add(new Lego("blue", "box", "s", 2));
    result.add(new Lego("green", "cube", "l", 4));
    result.add(new Lego("yellow", "odd", "m", 3));
    result.add(new Lego("red", "sphere", "xs", 0));
    result.add(new Lego("blue", "box", "xl", 8));
    result.add(new Lego("green", "odd", "s", 1));
    result.add(new Lego("green", "odd", "m", 1));
    result.add(new Lego("green", "odd", "l", 1));
    result.add(new Lego("yellow", "odd", "m", 3));
    result.add(new Lego("blue", "box", "m", 4));
    result.add(new Lego("green", "cube", "l", 4));
    result.add(new Lego("yellow", "sphere", "m", 3));
    result.add(new Lego("red", "sphere", "xl", 7));
    result.add(new Lego("blue", "cube", "xl", 8));
    result.add(new Lego("green", "odd", "s", 1));
    result.add(new Lego("yellow", "box", "s", 1));
    result.add(new Lego("yellow", "sphere", "l", 1));
    Observable<Lego> observable = Observable.from(result);
    return observable;
  }

to get started (Don't get hung up on the Collection nature, it's just the sample). The output that I need is:

  1. All individual items (they will go through map and transform later)
  2. number of items per: colour, shape, size
  3. total number of items
  4. sum of nipples

I can produce - more or less - these outputs separately (not sure of #2):

  1. getLegoSample().subscribe();
  2. getLegoSample().count().subscribe();
  3. getLegoSample().map(LegotoNipples).sum().subscibe();

for 2. I used collect() - is that the right way?

getLegoSample().collect(new HashMap<String,Integer>(), new Action2<Map<String,Integer>,Lego>(){
@Override
public void call(Map<String, Integer> t1, Lego t2) {
  if (t1.containsKey(t2.color)) {
    t1.put(t2.color,(t1.get(t2.color).intValue()+1));
  } else {
    t1.put(t2.color, 1);
  }

}}).subscribe();

2 Questions:

  1. Is 'collect()' the right approach (I would use a specialised collection object, not just as HashMap)?
  2. How can I combine these sequences, so I only run through my original Observable once.

Update (4Sep):

Reading the huge documentation helps (my bad). I was looking literally at the wrong end. To get the Observable to emit only once for all potential subscribers, it needs to be a Connectable Observable, a subclass of Observable. This can be achieved using publish() on any Observable.

A Connectable Observable does not emit any values until its connect() method is called.

So my code would look like this:

ConnectableObservable co = getLegoSample().publish();
co.subscribe();
co.collect(new HashMap<String,Integer>(), new Action2<Map<String,Integer>,Lego>(){
@Override
public void call(Map<String, Integer> t1, Lego t2) {
  if (t1.containsKey(t2.color)) {
    t1.put(t2.color,(t1.get(t2.color).intValue()+1));
  } else {
    t1.put(t2.color, 1);
  }

}}).subscribe();
co.count().subscribe();
co.map(LegotoNipples).sum().subscribe();
co.connect();

Remains the question on collect() for aggregation and eventually, when using different schedulers, how to best orchestrate (but there are answers around for that).


(And yes, I'm stuck with Java < 8). Gist of my tests

Upvotes: 1

Views: 593

Answers (1)

m.ostroverkhov
m.ostroverkhov

Reputation: 1960

Also you can use Observable.groupBy for aggregation. Consider this sample:

Observable<NamedCount<String>> countStream = 
  getLegoSample()
 .groupBy(Lego::getColor)
 .flatMap(obs -> obs.count().map(count -> new NamedCount<>(obs.getKey(), count)))

With helper

 public class NamedCount<T> {

    private final T name;
    private final int count;

    public NamedCount(T name, int count) {
        this.name = name;
        this.count = count;
    }

    public T getName() {
        return name;
    }

    public int getCount() {
        return count;
    }
}

Subscribing to it

countStream.subscribe(System.out::println);                

produces following output

NamedCount{name=red, count=3}
NamedCount{name=green, count=6}
NamedCount{name=blue, count=4}
NamedCount{name=yellow, count=5}

Changing thr function passed to Observable.groupBy() one can get other aggregations (count by shape, size etc)

Upvotes: 1

Related Questions