Reputation: 39836
On my Android project I'm heavily relying on RxJava2, SqlBrite(with RxJavaInterop) and SqlDelight.
I got one rx stream that is supposed to go indefinitely (until my service stops) and on it I have a .flatMap
of Function<String, ObservableSource<Action>>
.
Meaning, this flatMap
contains a Subject<Action>
, will receive String actionId
, do some (irrelevant for the question) processing on those actionId, and depending on condition should query the database for the Action
object and dispatch it to the subject
My first approach was to do the query directly:
Cursor c = db.query(...);
if(c.moveFirst()) {
Action a = Action.SELECT_ALL_MAPPER.map(c);
subject.onNext(selectAll);
}
But this blocks the running thread and I rather trigger this on its own stream that should do the following:
Action
object and push the value to the subject
subject
cannot receive terminate or error. It must stay alive for future events.My current approach is the following code:
RxJavaInterop.toV2Observable(db.createQuery(
Action.TABLE_NAME,
Action.FACTORY.Select_by_id(actionId).statement)
.mapToOne(new Func1<Cursor, Action>() {
@Override public Action call(Cursor cursor) {
return Action.SELECT_ALL_MAPPER.map(cursor);
}
}))
.take(1)
.subscribe(new Consumer<Action>() {
@Override public void accept(Action action) throws Exception {
subject.onNext(action);
}
});
And although this seems to do the trick on the first impression, I see a few errors on it:
Consumer<Action>
because it "might have not been initialised" (which I understand the reason, it's OK).So the question:
How can I do that?
Upvotes: 0
Views: 232
Reputation: 5521
I rather trigger this on its own stream
Take a look at RxAndroid. This could look like:
yourRxStream
.flatMap(*db request here*)
.subscribeOn(Schedulers.io())
.subcribe(subject);
the
subject
cannot receive terminate or error. It must stay alive for future events.
Switch the Subject with a Relay:
Subjects are useful to bridge the gap between non-Rx APIs. However, they are stateful in a damaging way: when they receive an
onComplete
oronError
they no longer become usable for moving data. This is the observable contract and sometimes it is the desired behavior. Most times it is not.Relays are simply Subjects without the aforementioned property. They allow you to bridge non-Rx APIs into Rx easily, and without the worry of accidentally triggering a terminal state.
Finally for the request than could output 0 or 1 item, use a Maybe.
Upvotes: 1