Rovdjuret
Rovdjuret

Reputation: 1528

How to call non-observable code from rxjava chain

I've implemented a rxjava chain of calls in my presenter. What it does is it calls remote server with retrofit if there are none results returned from sqlite database.

However, everything is using rxjava except our repository which calls sqlite. When the sqlite returns a result it shows me for example 3 results with null values. It seems it doesn't work to call non observable code from rxjava chain? How can I do it without using StorIO or Brite?

Result looks like this

enter image description here

Presenter

@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableLatestCache(REQUEST_ASSIGNMENTS,
        () -> mRepository.query()
                    .subscribeOn(Schedulers.io())
                    .observeOn(mainThread()),
        (assignmentActivity, response) -> assignmentActivity.onSuccess(response),
        (assignmentActivity, throwable) -> assignmentActivity.onError(throwable)
    );
}

Repository

return mAssignmentLocalDataStore.query(new AssignmentSpecification())
            .flatMap(assignments -> assignments == null || assignments.isEmpty() ?
                    mAssignmentRemoteDataStore.query()
                        .flatMap(remoteAssignments ->
                            Observable.zip(
                                    mEntityRepository.query()
                                            .flatMap(mEntityRepository::add),
                                    mFacilityRepository.query()
                                            .flatMap(mFacilityRepository::add),
                                    mAssignmentLocalDataStore.query(new AssignmentSpecification()),
                                    (remoteEntities, remoteFacilities, assignmentsRetry) -> assignmentsRetry
                            )
                        ): Observable.just(assignments)
            );

SQLite LocalDataStore

@Override
public Observable<List<Assignment>> query(Specification specification) {
    final SqlSpecification sqlSpecification = (SqlSpecification) specification;

    final SQLiteDatabase database = mOpenHelper.getReadableDatabase();
    final List<Assignment> assignments = new ArrayList<>();

    try {
        final Cursor cursor = database.rawQuery(sqlSpecification.toSqlQuery(), new String[]{});

        for (int i = 0, size = cursor.getCount(); i < size; i++) {
            cursor.moveToPosition(i);

            assignments.add(mToAssignmentMapper.map(cursor));
        }

        cursor.close();

        return Observable.just(assignments);

    } finally {
        database.close();
    }
}

Upvotes: 0

Views: 194

Answers (1)

John O&#39;Reilly
John O&#39;Reilly

Reputation: 10330

Change query to something like following (using Observable.create). Another variation of this would be to return Observable<Assignment> and call subscriber.onNext for each record.

@Override 
public Observable<List<Assignment>> query(Specification specification) {
    return Observable.create(subscriber -> {
            final SqlSpecification sqlSpecification = (SqlSpecification) specification;

            final SQLiteDatabase database = mOpenHelper.getReadableDatabase();
            final List<Assignment> assignments = new ArrayList<>();

            try { 
                final Cursor cursor = database.rawQuery(sqlSpecification.toSqlQuery(), new String[]{});

                for (int i = 0, size = cursor.getCount(); i < size; i++) {
                    cursor.moveToPosition(i);

                    assignments.add(mToAssignmentMapper.map(cursor));
                } 

                subscriber.onNext(assignments);

                cursor.close();
            } finally { 
                database.close();
                subscriber.onCompleted(); 
            } 
    }
} 

Upvotes: 1

Related Questions