Reputation: 1485
I'm trying to process a stream of objects (via a http JSon request).
The Observble returns items like this:
"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo
The first item is a timestamp, then Foo objects to store in the db, then ids that represent Foo objects that need to be deleted from the db and finally Foo objects that need to be updated (I'm going to do an upsert for them).
My current implementation looks like this:
public void updateFoos(final CallBack callBack) {
final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
fooService.getFoos(lastFooUpdateTimestamp)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
callBack.onSuccess();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (o instanceof String) {
localStorage.setLastFooUpdateTimestamp((String) o);
}
if (o instanceof Foo) {
databaseManager.save((Foo) o);
}
}
});
}
There's a number of issues:
Upvotes: 3
Views: 270
Reputation: 69997
Here is an example how it could be done by using RxJava:
public class MultikindSource {
enum ValueType {
TIMESTAMP,
NUMBER,
FOO
}
static final class Foo { }
static Observable<Object> source(String timestamp) {
return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
1, 2, 3, new Foo()));
}
public static void main(String[] args) {
Func1<Object, ValueType> keySelector = o -> {
if (o instanceof String) {
return ValueType.TIMESTAMP;
} else
if (o instanceof Foo) {
return ValueType.FOO;
}
return ValueType.NUMBER;
};
AtomicReference<String> lastTimestamp = new AtomicReference<>(
"2015-05-08T11:38:00.000Z");
source(lastTimestamp.get())
.groupBy(keySelector)
.flatMap(g -> {
if (g.getKey() == ValueType.TIMESTAMP) {
g.subscribe(v -> {
System.out.println("Updating timestamp to " + v);
lastTimestamp.set((String)v);
});
} else
if (g.getKey() == ValueType.FOO) {
g.buffer(2).subscribe(v ->
System.out.println("Batch inserting " + v));
} else {
g.subscribe(v ->
System.out.println("Got some number: " + v));
}
return Observable.just(1);
}).count().subscribe(v ->
System.out.println("Got " + v + " kinds of events."));
}
}
Essentially, you group the source data by some enum, then chain onto these groups and subscribe to them to perform the work.
Upvotes: 4