Reputation: 133
I am trying to do the following; sync a cloud DB using Retrofit to a local SqLite DB (Room) on a device. The DB could get large, around 100,000 registers or more, so the sync process can take some time. So it send a first Retrofit request to get the number of register, so it can calculate the total number of pages, after that it will send multiple Retrofit Request, to get all the data from API, after each request, it saves the data to Room.
Right now, I am having trouble combining two RxJava calls or process, also on the second RxJava process, after a Retrofit call, there is a Room Insert of a List-of-Objets, but after the hole process ends, I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.
Please help me with:
Following the code:
Gradle
def room_version = "2.2.5"
//RxJava 2
implementation "io.reactivex.rxjava2:rxjava:2.2.19"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 2
implementation "com.squareup.retrofit2:adapter-rxjava2:2.8.1"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//Room
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
//RxJava support for Room
implementation "androidx.room:room-rxjava2:$room_version"
ItemSyncDetails
...
public class ItemSyncDetails {
@SerializedName("CurrentPage")
int currentPage;
@SerializedName("PageCount")
int pageCount;
@SerializedName("PageSize")
int pageSize;
@SerializedName("RecordCount")
int recordCount;
@SerializedName("Results")
List<Item> mItemList;
...
}
ItemDao
Note: I haven't used Observer/Flowable/Maybe/Single, because I having been able to make it work with RxJava
import io.reactivex.Flowable;
@Dao
public interface ItemDao {
@Insert(onConflict = OnConflictStrategy.REPLACE)
long insert(Item item);
@Insert(onConflict = OnConflictStrategy.REPLACE)
List<Long> insertAll(List<Item> items);
...
DataApi
import io.reactivex.rxjava3.core.Observable;
...
public interface DataApi {
@GET("item")
Observable<ItemSyncDetails> getItemsByPage(
@Query("pageSize") Integer pageSize,
@Query("currentPage") Integer currentPage,
@Query("sortBy") Integer sortBy
);
ItemRepository
import io.reactivex.Observable;
...
public class ItemRepository {
...
public ItemRepository(Application application) {
mDataApi = RetrofitClient.getRetrofitInstance("http://192.168.1.100").create(DataApi.class);
RfidDatabase db = RfidDatabase.getAppDatabase(application);
itemDao = db.itemDao();
itemList = itemDao.getAllItems();
inserts = 0;
}
public List<Long> insertAllLocal (List<Item> itemList) {
List<Long> items = itemDao.insertAll(itemList);
inserts += items.size();
Log.i(TAG, "************insertAllLocal - ItemRepository: " + inserts + "*************");
Log.i(TAG, "************insertAllLocal - ItemRepository: " + items);
return items;
}
public Observable<ItemSyncDetails> getRecordsCount(){
return mDataApi.getItemsByPage(1,1,1);
}
public Observable<ItemSyncDetails> getItemsPerPage(int pageSize,int currentPage){
return mDataApi.getItemsByPage(pageSize,currentPage,1);
}
...
SyncConfigFragment
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedule
...
public class SyncConfigFragment extends Fragment {
private ItemViewModel itemViewModel;
private ImageView imageSyncItems;
private ProgressDialog progressDialog;
private TextView tvSyncDescriptionItems;
private DataApi service;
private ItemSyncDetails mItemSyncDetails;
private List<Item> mItemlist;
private CompositeDisposable mCompositeDisposable;
private int mNumPages;
private int syncProgress;
...
@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
View view = inflater.inflate(R.layout.fragment_config_sync,container, false);
progressDialog = new ProgressDialog(getActivity());
sharedPref = getActivity().getSharedPreferences(
getString(R.string.sharepref_filename), Context.MODE_PRIVATE);
mItemlist = new ArrayList<Item>();
mCompositeDisposable = new CompositeDisposable();
itemViewModel = ViewModelProviders.of(this).get(ItemViewModel.class);
tvSyncDescriptionItems = view.findViewById(R.id.tvDescriptionSyncItems);
if(sharedPref.contains("last_sync_item")) {
tvSyncDescriptionItems.setText("Última actualización " + sharedPref.getString("last_sync_item",""));
} else{
tvSyncDescriptionItems.setText("No se ha Sincronizado");
}
imageSyncItems = view.findViewById(R.id.imageViewSyncItems);
imageSyncItems.setOnClickListener(clickListener);
return view;
}
private View.OnClickListener clickListener = new View.OnClickListener() {
public void onClick(View v) {
if (v.equals(imageSyncItems)) {
//If I uncomment the next line it does not work
//mCompositeDisposable.add(
mNumPages = 0;
syncProgress = 0;
showProgressDialog("Items");
getRecordsCount();
//); Closing round bracket for mCompositeDisposable
}
}
};//End View.OnClickListener
private void getRecordsCount(){
itemViewModel.getRecordsCount()
.subscribeOn(Schedulers.io())
.retry(3)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
}
private void HandleResults(ItemSyncDetails itemSyncDetails) {
this.mItemSyncDetails = itemSyncDetails;
int pageSize = 100;
int numPages = itemSyncDetails.getRecordCount()/pageSize;
if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
numPages++;
}
this.mNumPages = numPages;
}
private void getNumPagesHandlerComplete() {
getAllRecords(mNumPages);
}
private void handleError(Throwable throwable) {
tvSyncDescriptionItems.setText("**********Error de conexión...");
closeProgressDialog();
}
private void getAllRecords(int numPages){
//numPages: total of pages are the number of times to send the request to API
Observable.range(1, numPages)
.flatMap(i -> itemViewModel.getItemsPerPage(100,i))
.map(new Function<ItemSyncDetails, Integer>() {
@Override
public Integer apply(ItemSyncDetails itemSyncDetails) throws Throwable {
return itemViewModel.insertAllLocal(itemSyncDetails.getItemList()).size();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
}
private void getAllHandleResults(Integer i) {
progressDialog.setProgress(getProgress(i));
}
private void handleComplete() {
//last request finished
closeProgressDialog();
}
private int getProgress(int newItems){
syncProgress += newItems;
int progress = 0;
if (syncProgress == mItemSyncDetails.getRecordCount()){
progress = 100;
} else {
progress = (100 * syncProgress)/mItemSyncDetails.getRecordCount();
}
return progress;
}
...
}
http://192.168.1.10:82/api/v1.0/item?pageSize=1¤tPage=1&sortBy=1
Note: The page size could change, I am using a fixed size of a 100 items per page.
{
Results: [
{
epc: "202020202020202030303031",
barcode: "0001",
name: "Televisor Samnsung",
description: "0001",
creation_date: "2020-02-26T10:55:06",
last_update: "2020-02-26T10:55:06",
last_seen: "2020-02-26T10:55:06",
brand: "Samnsung",
serial_number: "0001",
parent: "",
fk_category: 1,
responsable: "",
purchase_date: "2020-02-26T10:55:06",
cost: 0,
fk_location: 1008,
fk_item_state: 1,
inventory_date: "2020-02-26T10:55:06"
}
],
CurrentPage: 1,
PageCount: 65565,
PageSize: 1,
RecordCount: 65565
}
Upvotes: 3
Views: 1660
Reputation: 3021
You posted a json response here before the edit.
CurrentPage: 1,
PageCount: 65566,
PageSize: 1,
RecordCount: 65566
If I understand correctly, then you have 65k items and 1 item in each page. Meaning 65k pages which means 65k network calls. That's a lot. You could improve this design first.
This way you reduce a lot of network calls and possibly reduce the wait time for sync.
As to your actual rx question:
val pageSize = 100
viewModel.getRecordsCount()
.map {
// logic from `HandleResults` function
// do some calculation
var numPages: Int = it.records / pageSize
if (it.records < pageSize || it.records % pageSize != 0) {
numPages++
}
return@map numPages
}
.flatMap { pages -> Observable.range(1, pages) }
.flatMap { page -> viewModel.getItemsPerPage(pageSize, page) }
.flatMap { itemSyncDetails ->
val items = viewModel.insertAllLocal(itemSyncDetails.getItemList())
return@flatMap Observable.just(items.size)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(....)
I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.
Log the error in handleError
function and see what the actual problem is.
Upvotes: 1