Reputation: 1076
I have an usecase in my application for downloading two zip files from the server. For that i have been using retrofit+rxjava(created two seperate retrofit services). For parallel I execution have been subscribing both retrofit service in new thread and later combining it using zip operator. It's working fine. But later I added map operator to both service for unzipping operation but it not executing the code written in the map operator and the control is passing directly to zip operation. I don't how to tackle this and i am newbie in reactive world.
What I have been tried so far
Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());
dFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source());
String dealerFilePath = dealerZipPath+File.separator+filename;
unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return dealerFilePath;
}
});
Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());;
generalFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source());
String generalFilePath = generalZipPath+File.separator+filename;
unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return generalFilePath;
}
});
Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception {
System.out.println("zipped yess");
return null;
}
}).observeOn(Schedulers.io());
zipped.subscribe(getObserver());
and the getObserver() function
private Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
System.out.println("------------total time-----------");
System.out.println("result value-->"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
When the code executes the control is transferred to the apply() function in zip operator and the map operator in both observable is not getting executed.
And there is another question
I am merging/zipping the two observables and the type passed to the operator is Response<"ResponseBody">. Actually i need the downloaded file path there(string type)and for that what should i do?
**
Updated the Solution as described by @Yaroslav Stavnichiy and now its working
**
Observable<String> deObservable = dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA,
params.getNumber(),params.getId(),params.getCtId(), params.getSessionId())
.flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception {
String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME,
StrConstants.FILE_NAME,responseBody.body().source());
return Observable.just(zipPath);
}
}).map(new Function<String, String>() {
@Override
public String apply(String filePath) throws Exception {
String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+
File.separator+ StrConstants.FOLDER_NAME;
unzipUtility.unzip(filePath, unzipDestinationPath);
return unzipDestinationPath;
}
}).subscribeOn(Schedulers.newThread());
Upvotes: 0
Views: 783
Reputation: 21446
What you are effectively doing is:
Observable a = ...;
Observable b = ...;
a.map(...);
b.map(...);
Observable.zip(a, b).subscribe(f);
map()
(as well as all other rx-operators) does not mutate the source. It returns new observable that you can use in further computations. In your code you are ignoring those returned objects. You are zipping original observables, not the mapped ones, that's why mapper functions do not get invoked.
I think you wanted to do the following:
Observable a = ... .map(...);
Observable b = ... .map(...);
Observable.zip(a, b).subscribe(f);
Upvotes: 2