Marcin Kunert
Marcin Kunert

Reputation: 6295

RxJava: Split observable into two lists

Usecase: I have to update the current orders stored in the local database. The steps are:

  1. Download data from backend(~800 items)
  2. Check if the local database already contains an item. The backend guys use a string as a primary key.
  3. If the item is not in the database, add it.
  4. If the item is in the database, update it.

My first solution was kinda easy to do. Simply call the backend and create two database queries for each item. First to check if it is already there, second to either add or update it.

As you may imagine it was slow. About 17,5 sec for each update.

Second solution: Cache the database data into a list and instead of querying the database each time search in ArrayList. This caused the update time to drop to 16,5 sec. I've used defer to create an Observable from the database call and combineLatest for getting the results.

Current solution: The bottleneck was of course the data updates. The orm library I'm using enables batch updates. So I have to create two lists: DataToUpdate and DataToInsert.

Current solution runs in about 5,3 sec. I'm happy with the time, but unhappy with the non reactive way this is done.

Observable<List<OrderDto>> getAllOrdersObservable = backendService.getAllOrders();

ordersDisposable = Observable.combineLatest(getStoredOrder(), getAllOrdersObservable, this::insertOrUpdateOrders)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnComplete(() -> {
            Log.d(TAG, "Duration: " + (System.currentTimeMillis() - start) + " ms");
            showSyncButton();
            notifyListenersDataDownloaded();
        })
        .subscribe();


private Observable<List<Order>> getStoredOrder() {
    return Observable.defer(() -> {
        Log.d(TAG, "Started loading orders from database");
        Observable<List<Order>> just = Observable.just(orderDao.loadAll());
        Log.d(TAG, "Ended loading orders from database");
        return just;
    });
}

private List<Order> insertOrUpdateOrders(List<Order> orders, List<OrderDto> orderDtos) {
    List<Order> ordersToInsert = new LinkedList<>();
    List<Order> ordersToUpdate = new LinkedList<>();

    for (OrderDto orderDto : orderDtos) {
        Order order = getOrderByOrderNumber(orders, orderDto.getNumber());
        if (order != null) {
            dtoToEntityTransformer.updateFields(orderDto, order);
            ordersToUpdate.add(order);
        } else {
            order = dtoToEntityTransformer.transformToEntity(orderDto);
            ordersToInsert.add(order);
        }
    }

    orderDao.insertInTx(ordersToInsert);
    orderDao.updateInTx(ordersToUpdate);

    return orders;
}

The question

Do you have an idea how to solve this in an reactive way? Is there an operator which allows to kinda split the observable in two lists. Or maybe I should use a global variable (seems like a bad idea) to keep the info which data to insert and which to update?

Upvotes: 1

Views: 1338

Answers (1)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16152

Use groupBy, where the key is whether it's an update/insert operation; you will get back a GroupedObservable which includes the key. Use that key to perform the appropriate operation.

Upvotes: 1

Related Questions