Sunny
Sunny

Reputation: 1076

Android Retrofit parallel file download issue

I have an usecase in my application for downloading two zip files from the server. For that i have been using retrofit+rxjava(created two seperate retrofit services). For parallel I execution have been subscribing both retrofit service in new thread and later combining it using zip operator. It's working fine. But later I added map operator to both service for unzipping operation but it not executing the code written in the map operator and the control is passing directly to zip operation. I don't how to tackle this and i am newbie in reactive world.

What I have been tried so far

    Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA,
            params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());
    dFileObservable.map(new Function<Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBody) throws Exception {
            String header = responseBody.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
            String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME);
            fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source());
            String dealerFilePath = dealerZipPath+File.separator+filename;
            unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
            return dealerFilePath;
        }
    });

   Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA,
            params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());;
    generalFileObservable.map(new Function<Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBody) throws Exception {
            String header = responseBody.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
            String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME);
            fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source());
            String generalFilePath = generalZipPath+File.separator+filename;
            unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
            return generalFilePath;
        }
    });

   Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception {
            System.out.println("zipped yess");
            return null;
        }
    }).observeOn(Schedulers.io());

    zipped.subscribe(getObserver());

and the getObserver() function

    private Observer<String> getObserver(){

    return new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String value) {

            System.out.println("------------total time-----------");
            System.out.println("result value-->"+value);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
}

When the code executes the control is transferred to the apply() function in zip operator and the map operator in both observable is not getting executed.

And there is another question

I am merging/zipping the two observables and the type passed to the operator is Response<"ResponseBody">. Actually i need the downloaded file path there(string type)and for that what should i do?

**

Updated the Solution as described by @Yaroslav Stavnichiy and now its working

**

    Observable<String> deObservable =  dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA,
            params.getNumber(),params.getId(),params.getCtId(), params.getSessionId())
            .flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception {
                    String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME,
                            StrConstants.FILE_NAME,responseBody.body().source());
                    return Observable.just(zipPath);
                }
            }).map(new Function<String, String>() {
                @Override
                public String apply(String filePath) throws Exception {
                    String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+
                            File.separator+ StrConstants.FOLDER_NAME;
                    unzipUtility.unzip(filePath, unzipDestinationPath);
                    return unzipDestinationPath;
                }
            }).subscribeOn(Schedulers.newThread());

Upvotes: 0

Views: 783

Answers (1)

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21446

What you are effectively doing is:

Observable a = ...;
Observable b = ...;
a.map(...);
b.map(...);
Observable.zip(a, b).subscribe(f);

map() (as well as all other rx-operators) does not mutate the source. It returns new observable that you can use in further computations. In your code you are ignoring those returned objects. You are zipping original observables, not the mapped ones, that's why mapper functions do not get invoked.

I think you wanted to do the following:

Observable a = ... .map(...);
Observable b = ... .map(...);
Observable.zip(a, b).subscribe(f);

Upvotes: 2

Related Questions