Reputation: 5664
I am new to RxJava and finding it very useful for network and database processing within my Android applications.
I have two use cases that I cannot implement completely in RxJava
Use Case 1
The closest I have managed is this code
final AtomicInteger id = new AtomicInteger(0);
DatabaseController.deleteAll(TableA_DO.class);
DatabaseController.fetchTable_Bs()
.subscribeOn(Schedulers.io())
.toObservable()
.flatMapIterable(b -> b)
.flatMap(b_record -> NetworkController.getTable_A_data(b_record.getKey()))
.flatMap(network -> transformNetwork(id, network, NETWORK_B_MAPPER))
.doOnNext(DatabaseController::persistRealmObjects)
.doOnComplete(onComplete)
.doOnError(onError)
.doAfterTerminate(doAfterTerminate())
.doOnSubscribe(compositeDisposable::add)
.subscribe();
Use Case 2
I have not managed to create any code for use case 2.
I have a number of questions regarding the use of RxJava for these use cases.
UPDATE
I ended up with this POC test code which seems to work... I am not sure if its the optimum solution however My API calls return Single and my database operations return Completable so I feel like this is the best solution for me.
public class UseCaseOneA {
public static void main(final String[] args) {
login()
.andThen(UseCaseOneA.deleteDatabaseTableA())
.andThen(UseCaseOneA.deleteDatabaseTableB())
.andThen(manufactureRecords())
.flatMapIterable(x -> x)
.flatMapSingle(record -> NetworkController.callApi(record.getPrimaryKey()))
.flatMapSingle(z -> transform(z))
.flatMapCompletable(p -> UseCaseOneA.insertDatabaseTableA(p))
.doOnComplete(() -> System.out.println("ON COMPLETE"))
.doFinally(() -> System.out.println("ON FINALLY"))
.subscribe();
}
private static Single<List<PayloadDO>> transform(final List<RemotePayload> payloads) {
return Single.create(new SingleOnSubscribe<List<PayloadDO>>() {
@Override
public void subscribe(final SingleEmitter<List<PayloadDO>> emitter) throws Exception {
System.out.println("transform - " + payloads.size());
final List<PayloadDO> payloadDOs = new ArrayList<>();
for (final RemotePayload remotePayload : payloads) {
payloadDOs.add(new PayloadDO(remotePayload.getPayload()));
}
emitter.onSuccess(payloadDOs);
}
});
}
private static Observable<List<Record>> manufactureRecords() {
final List<Record> records = new ArrayList<>();
records.add(new Record("111-111-111"));
records.add(new Record("222-222-222"));
records.add(new Record("3333-3333-3333"));
records.add(new Record("44-444-44444-44-4"));
records.add(new Record("5555-55-55-5-55-5555-5555"));
return Observable.just(records);
}
private static Completable deleteDatabaseTableA() {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter emitter) throws Exception {
System.out.println("deleteDatabaseTableA");
emitter.onComplete();
}
});
}
private static Completable deleteDatabaseTableB() {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter emitter) throws Exception {
System.out.println("deleteDatabaseTableB");
emitter.onComplete();
}
});
}
private static Completable insertDatabaseTableA(final List<PayloadDO> payloadDOs) {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter emitter) throws Exception {
System.out.println("insertDatabaseTableA - " + payloadDOs);
emitter.onComplete();
}
});
}
private static Completable login() {
return Completable.complete();
}
}
This code doesn't address all my use case requirements. Namely being able to transform the remote payload records into multiple Database record types and insert each type into its own specific target databased table.
I could just call the Remote API twice to get the same remote data items and transform first into one database type then secondly into the second database type, however that seems wasteful.
Is there an operand in RxJava where I can reuse the output from my API calls and transform them into another database type?
Upvotes: 0
Views: 661
Reputation: 69997
You have to index the items yourself in some manner, for example, via external counting:
Observable.defer(() -> {
AtomicInteger counter = new AtomicInteger();
return DatabaseController.fetchTable_Bs()
.subscribeOn(Schedulers.io())
.toObservable()
.flatMapIterable(b -> b)
.doOnNext(item -> {
if (counter.getAndIncrement() == 0) {
// this is the very first item
} else {
// these are the subsequent items
}
});
});
The defer
is necessary to isolate the counter to the inner sequence so that repetition still works if necessary.
Upvotes: 1