Matthew Smith
Matthew Smith

Reputation: 4637

Using RxJava to join local data with remote ( or cached ) data

This is working code but I have a few questions as well as a request for advice on improving it. I am new to RxJava and I have not fully wrapped my head around how to chain these type of observables together.

I have two model objects, ListItem and UserInfo. ListItems exists in a local database and the UserInfo is fetched from the server by using an ID provided from the ListItem.

The UserInfo web service accepts an array of IDs for which it will return a list of UserInfo objects.

The flow of this code is as follows:

  1. Load ListItems from database
  2. Using the ListItems fetched from the database, check an in memory cache to see if I've already fetched the UserInfo for a particular ListItem
  3. For any items whose UserInfo is not cached, fetch them from the network
  4. Place the fetched UserInfo objects into the cache
  5. Re-run step 2 ( the method is loadCachedUserInfo)
  6. Return results to subscriber

NOTE: The UserInfo objects should only be fetched for a ListItem if the list has been deemed a isUserList.

Here is the code:

fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        }
        return@flatMap Observable.just(listItems)
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false)
        }
        return@flatMap Observable.just(listItems)
    }
}

fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        for ( listItem in listItems ) {
            listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()]
        }
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }
}

fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> {
    val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
    val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
    val records = hashMapOf("records" to ids)
    if ( itemsToFetch.count() == 0 ) {
        return Observable.just(listItems)
    }
    return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
            .map { json ->
                val recordsArray = json.arrayValue("records")
                for ( i in 0..recordsArray.length() - 1) {
                    val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i))
                    coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
                    coreUserMap[coreUserInfo.userID] = coreUserInfo
                    coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
                }
                return@map listItems
            }.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) }
}

The user would initiate the sequence of events by calling:

ListController.itemsInList(list)

My questions about this code are:

  1. Currently loadCachedUserInfo takes in an array of ListItem and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo associated with it. However, I need to continue passing the full array of ListItem to the next method

2.) Do I need to do additional work to support unsubscribing?

3.) This is similar question 1. My fetchUserInfoForListItems takes an array of list items and returns an observable with that same array of list items after they have been fetched and re-run through the cache method. This also feels incorrect to me. I would rather this method return an Observable<List<UserInfo>> for the objects that were fetched. I am not understanding how in itemsInList to then associate the ListItems with the newly fetched UserInfo and return an Observable of those ListItems.

Edit: After writing this post it gave helped me realize a few things. I can flatMap wrap my calls in an Observable.create that can contain the smarts I wanted to take out of my fetchUserInfoForListItems, letting me address question #3. Here is the updated code:

 fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
        }
        return@flatMap Observable.just(listItems)
    }.flatMap { listItems ->
        if ( list.isUserList ) {
            return@flatMap Observable.create<List<ATListItem>> { subscriber ->
                fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList ->
                    for (coreUserInfo in userInfoList) {
                        coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
                        coreUserMap[coreUserInfo.userID] = coreUserInfo
                        coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
                    }
                }.flatMap {
                    loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
                }.subscribe {
                    subscriber.onNext(listItems)
                    subscriber.onCompleted()
                }
            }
        }
        return@flatMap Observable.just(listItems)
    }
}

fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
    return Observable.create<List<ATListItem>> { subscriber ->
        listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] }
        subscriber.onNext(listItems)
        subscriber.onCompleted()
    }
}

fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> {
    val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
    val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
    val records = hashMapOf("records" to ids)
    if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) }
    return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
            .map { json ->
                val userInfo = ArrayList<CoreUserInfo>()
                json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) }
                return@map userInfo
            }
}

Upvotes: 3

Views: 1210

Answers (1)

pt2121
pt2121

Reputation: 11870

  1. Currently loadCachedUserInfo takes in an array of ListItem and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo associated with it. However, I need to continue passing the full array of ListItem to the next method

I am not sure I understand you correctly but if you only need side-effect (caching), you can just use doOnNext. For example,

.doOnNext { listItems ->
    if ( list.isUserList ) {
        cache(listItems, userIDIndex = list.userIDIndex!!)
    }
}

fun cache(listItems : List<ATListItem>, userIDIndex : Int) {
    // caching
}
  1. Do I need to do additional work to support unsubscribing?

No, AFAIK.

Note:

More info about doOnNext can be found at What is the purpose of doOnNext(...) in RxJava and here

Normally you don't need return@... if the last statement in lambda is an expression. e.g.:

.flatMap { listItems ->
    if ( list.isUserList ) {
        return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
    }
    return@flatMap Observable.just(listItems)
}    

can be written like:

.flatMap { listItems ->
    if ( list.isUserList )
        loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
    else
        Observable.just(listItems)
} 

I didn't test the code.

Upvotes: 3

Related Questions