FinalFive
FinalFive

Reputation: 1485

Using RxJava to process varying object stream

I'm trying to process a stream of objects (via a http JSon request).

The Observble returns items like this:

"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo

The first item is a timestamp, then Foo objects to store in the db, then ids that represent Foo objects that need to be deleted from the db and finally Foo objects that need to be updated (I'm going to do an upsert for them).

My current implementation looks like this:

public void updateFoos(final CallBack callBack) {

    final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();

    fooService.getFoos(lastFooUpdateTimestamp)
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    callBack.onSuccess();
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Object o) {
                    if (o instanceof String) {
                        localStorage.setLastFooUpdateTimestamp((String) o);
                    }

                    if (o instanceof Foo) {
                        databaseManager.save((Foo) o);
                    }
                }
            });
}

There's a number of issues:

  1. instanceof checks is not very RxJavay, is there a better way?
  2. the timestamp is always the first field, anyway to express that cleanly?
  3. I also want to batch db inserts, so having a separate block to deal with Foo objects that also batches them would be good.
  4. Is there a better design where I emit multiple Observables by type? But then how do I subscribe with multiple Observers?

Upvotes: 3

Views: 270

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Here is an example how it could be done by using RxJava:

public class MultikindSource {
    enum ValueType {
        TIMESTAMP,
        NUMBER,
        FOO
    }
    static final class Foo { }
    static Observable<Object> source(String timestamp) {
        return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
            1, 2, 3, new Foo()));
    }
    public static void main(String[] args) {
        Func1<Object, ValueType> keySelector = o -> {
            if (o instanceof String) {
                return ValueType.TIMESTAMP;
            } else
            if (o instanceof Foo) {
                return ValueType.FOO;
            }
            return ValueType.NUMBER;
        };
        AtomicReference<String> lastTimestamp = new AtomicReference<>(
            "2015-05-08T11:38:00.000Z");
        source(lastTimestamp.get())
        .groupBy(keySelector)
        .flatMap(g -> {
            if (g.getKey() == ValueType.TIMESTAMP) {
                g.subscribe(v -> {
                    System.out.println("Updating timestamp to " + v);
                    lastTimestamp.set((String)v);
                });
            } else
            if (g.getKey() == ValueType.FOO) {
                g.buffer(2).subscribe(v -> 
                    System.out.println("Batch inserting " + v));
            } else {
                g.subscribe(v -> 
                    System.out.println("Got some number: " + v));
            }
            return Observable.just(1);
        }).count().subscribe(v -> 
            System.out.println("Got " + v + " kinds of events."));
    }
}

Essentially, you group the source data by some enum, then chain onto these groups and subscribe to them to perform the work.

Upvotes: 4

Related Questions