Budius
Budius

Reputation: 39836

Query RxJava2 Db into another subject

On my Android project I'm heavily relying on RxJava2, SqlBrite(with RxJavaInterop) and SqlDelight.

I got one rx stream that is supposed to go indefinitely (until my service stops) and on it I have a .flatMap of Function<String, ObservableSource<Action>>.

Meaning, this flatMap contains a Subject<Action>, will receive String actionId, do some (irrelevant for the question) processing on those actionId, and depending on condition should query the database for the Action object and dispatch it to the subject

My first approach was to do the query directly:

Cursor c = db.query(...);
if(c.moveFirst()) {
    Action a = Action.SELECT_ALL_MAPPER.map(c);
    subject.onNext(selectAll);
}

But this blocks the running thread and I rather trigger this on its own stream that should do the following:

My current approach is the following code:

RxJavaInterop.toV2Observable(db.createQuery(
    Action.TABLE_NAME,
    Action.FACTORY.Select_by_id(actionId).statement)
    .mapToOne(new Func1<Cursor, Action>() {
        @Override public Action call(Cursor cursor) {
            return Action.SELECT_ALL_MAPPER.map(cursor);
        }
    }))
    .take(1)
    .subscribe(new Consumer<Action>() {
        @Override public void accept(Action action) throws Exception {
            subject.onNext(action);
        }
    });

And although this seems to do the trick on the first impression, I see a few errors on it:

So the question:

How can I do that?

Upvotes: 0

Views: 232

Answers (1)

Geoffrey Marizy
Geoffrey Marizy

Reputation: 5521

I rather trigger this on its own stream

Take a look at RxAndroid. This could look like:

yourRxStream
    .flatMap(*db request here*)
    .subscribeOn(Schedulers.io())
    .subcribe(subject);

the subject cannot receive terminate or error. It must stay alive for future events.

Switch the Subject with a Relay:

Subjects are useful to bridge the gap between non-Rx APIs. However, they are stateful in a damaging way: when they receive an onComplete or onError they no longer become usable for moving data. This is the observable contract and sometimes it is the desired behavior. Most times it is not.

Relays are simply Subjects without the aforementioned property. They allow you to bridge non-Rx APIs into Rx easily, and without the worry of accidentally triggering a terminal state.


Finally for the request than could output 0 or 1 item, use a Maybe.

Upvotes: 1

Related Questions