devz
devz

Reputation: 2819

RxJava dependend method calls with async for loop

I have multiple dependend calls that I would like to solve with RxJava:

  1. Get all entities that have a file that is not yet uploaded
  2. Upload the file to the server
  3. Update the entity with the received url
  4. Upload the entity to the server

I tried different approaches but coulden't make it work. I can't get my head around on how to wait until the file upload has been finished. Here is my current code:

Observable.fromIterable(repository.getFileNotUploaded()) // Returns a list of all entities that should be uploaded to the server
    .flatMap(entity ->
            restService.uploadFile(new File(directory.getPath(), entity.getLocalPath()))
                    .subscribe(fileUrl -> {
                        entity.setFileUrl(fileUrl);
                        repository.update(entity);
                    }));
// TODO: Wait until all files have been uploaded and the entities have been stored
// locally. Then upload the list of all entities. 

Rest call:

public Observable<String> uploadFile(File file) {

    return Observable.create(emitter -> {
        AsyncTask.execute(new Runnable() {
            @Override
            public void run() {
                commClient.sendFileRequest(URL, file,
                        response -> {
                        // ...
                        if(success){
                                emitter.onNext(new String(response.data));
                        }

                            emitter.onComplete();
                        }
            }
        });
    });
}

Also I read that calling subscribe inside a flat map is an anti pattern. How can I cascade my method calls? Should I use the range method and return the current positoin of my iteration?


Edit - Working solution thanks to Emanuel S

The solution of Emanuel S works. I had also to change my rest service. This service must return an Observable<Entity> and not an Observable<String>. Also note that one should not mix up AsyncTask and Rx.

public Observable<Entity> uploadFile(File file, Entity entity) {
//...
entity.setFileUrl(fileUrl);
emitter.onNext(entity);
//...

Upvotes: 0

Views: 1419

Answers (1)

Emanuel
Emanuel

Reputation: 8106

If repository.getFileNotUploaded() is an ArrayList/Collection you should create your Observable with just and iterate after.

This may work (untested, wrote it without IDE) and upload all your entites in a bunch.

As akarnokd wrote, you dont need to use just and flatmapIterable, since you can just use fromIterable.

Observable.just(repository.getFileNotUploaded()).flatMapIterable ( files -> file)
// OR 
Observable.flatmapIterable(repository.getFileNotUploaded())

.flatMap(entity -> rs.uploadFile(new File(yourPath, entity.getLocalPath())) // for each file not uploaded run another observable and return it
.map(entity -> { entity.setFieldUrl(fileUrl); return entity; }) // modify the entity and return it
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList))
.subscribe( 
    success -> Log.d("Success", "All entities and files uploaded"), 
    error -> Log.e("Error", "error happened somewhere")
);

If you want to upload each modified entity with a single call you may want to replace

.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList ))

with

.flatMapIterable( singleEntity -> uploadSingleEntity(singleEntity))

And dont mix asyntask with RxJava. You dont need AsyncTask if you have RXJava.

Take care: If you stream your Data and if your repository Observable emit data you need to use

repository.getFileNotUploaded() // Observable<Whatever> emit data in a stream.
.flatMapIterable ( files -> file) ...

Upvotes: 2

Related Questions