Hector
Hector

Reputation: 5664

How to Iterate through list with RxJava and perform initial process on first item

I am new to RxJava and finding it very useful for network and database processing within my Android applications.

I have two use cases that I cannot implement completely in RxJava

Use Case 1

  1. Clear down my target database table Table A
  2. Fetch a list of database records from Table B that contain a key field
  3. For each row retrieved from Table B, call a Remote API and persist all the returned data into Table A

The closest I have managed is this code

final AtomicInteger id = new AtomicInteger(0);

DatabaseController.deleteAll(TableA_DO.class);

DatabaseController.fetchTable_Bs()
        .subscribeOn(Schedulers.io())
        .toObservable()
        .flatMapIterable(b -> b)
        .flatMap(b_record -> NetworkController.getTable_A_data(b_record.getKey()))
        .flatMap(network -> transformNetwork(id, network, NETWORK_B_MAPPER))
        .doOnNext(DatabaseController::persistRealmObjects)
        .doOnComplete(onComplete)
        .doOnError(onError)
        .doAfterTerminate(doAfterTerminate())
        .doOnSubscribe(compositeDisposable::add)
        .subscribe();

Use Case 2

  1. Clear down my target database table Table X
  2. Clear down my target database table Table Y
  3. Fetch a list of database records from Table Z that contain a key field
  4. For each row retrieved from Table B, call a Remote API and persist some of the returned data into Table X the remainder of the data should be persisted into table Y

I have not managed to create any code for use case 2.

I have a number of questions regarding the use of RxJava for these use cases.

  1. Is it possible to achieve both my use cases in RxJava?
  2. Is it "Best Practice" to combine all these steps into an Rx "Stream"

UPDATE

I ended up with this POC test code which seems to work... I am not sure if its the optimum solution however My API calls return Single and my database operations return Completable so I feel like this is the best solution for me.

public class UseCaseOneA {

    public static void main(final String[] args) {

        login()
        .andThen(UseCaseOneA.deleteDatabaseTableA())
        .andThen(UseCaseOneA.deleteDatabaseTableB())
        .andThen(manufactureRecords())
            .flatMapIterable(x -> x)
            .flatMapSingle(record -> NetworkController.callApi(record.getPrimaryKey()))
            .flatMapSingle(z -> transform(z))
            .flatMapCompletable(p -> UseCaseOneA.insertDatabaseTableA(p))
            .doOnComplete(() -> System.out.println("ON COMPLETE"))
            .doFinally(() -> System.out.println("ON FINALLY"))
            .subscribe();

    }

    private static Single<List<PayloadDO>> transform(final List<RemotePayload> payloads) {
        return Single.create(new SingleOnSubscribe<List<PayloadDO>>() {
            @Override
            public void subscribe(final SingleEmitter<List<PayloadDO>> emitter) throws Exception {
                System.out.println("transform - " + payloads.size());

                final List<PayloadDO> payloadDOs = new ArrayList<>();

                for (final RemotePayload remotePayload : payloads) {
                    payloadDOs.add(new PayloadDO(remotePayload.getPayload()));
                }

                emitter.onSuccess(payloadDOs);
            }
        });
    }

    private static Observable<List<Record>> manufactureRecords() {
        final List<Record> records = new ArrayList<>();
        records.add(new Record("111-111-111"));
        records.add(new Record("222-222-222"));
        records.add(new Record("3333-3333-3333"));
        records.add(new Record("44-444-44444-44-4"));
        records.add(new Record("5555-55-55-5-55-5555-5555"));

        return Observable.just(records);
    }

    private static Completable deleteDatabaseTableA() {

        return Completable.create(new CompletableOnSubscribe() {

            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("deleteDatabaseTableA");

                emitter.onComplete();
            }
        });
    }

    private static Completable deleteDatabaseTableB() {

        return Completable.create(new CompletableOnSubscribe() {

            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("deleteDatabaseTableB");

                emitter.onComplete();
            }
        });
    }

    private static Completable insertDatabaseTableA(final List<PayloadDO> payloadDOs) {
        return Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(final CompletableEmitter emitter) throws Exception {
                System.out.println("insertDatabaseTableA - " + payloadDOs);

                emitter.onComplete();
            }
        });
    }

    private static Completable login() {
        return Completable.complete();
    }
}

This code doesn't address all my use case requirements. Namely being able to transform the remote payload records into multiple Database record types and insert each type into its own specific target databased table.

I could just call the Remote API twice to get the same remote data items and transform first into one database type then secondly into the second database type, however that seems wasteful.

Is there an operand in RxJava where I can reuse the output from my API calls and transform them into another database type?

Upvotes: 0

Views: 661

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You have to index the items yourself in some manner, for example, via external counting:

Observable.defer(() -> {
    AtomicInteger counter = new AtomicInteger();

    return DatabaseController.fetchTable_Bs()
        .subscribeOn(Schedulers.io())
        .toObservable()
        .flatMapIterable(b -> b)
        .doOnNext(item -> {
            if (counter.getAndIncrement() == 0) {
                // this is the very first item
            } else {
                // these are the subsequent items
            }
        });
});

The defer is necessary to isolate the counter to the inner sequence so that repetition still works if necessary.

Upvotes: 1

Related Questions