Reputation: 4637
This is working code but I have a few questions as well as a request for advice on improving it. I am new to RxJava and I have not fully wrapped my head around how to chain these type of observables together.
I have two model objects, ListItem
and UserInfo
. ListItem
s exists in a local database and the UserInfo
is fetched from the server by using an ID provided from the ListItem
.
The UserInfo
web service accepts an array of IDs for which it will return a list of UserInfo
objects.
The flow of this code is as follows:
ListItem
s from databaseListItem
s fetched from the database, check an in memory cache to see if I've already fetched the UserInfo
for a particular ListItem
UserInfo
is not cached, fetch them from the networkUserInfo
objects into the cacheloadCachedUserInfo
)NOTE: The UserInfo
objects should only be fetched for a ListItem
if the list has been deemed a isUserList
.
Here is the code:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false)
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
for ( listItem in listItems ) {
listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()]
}
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<ATListItem>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) {
return Observable.just(listItems)
}
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val recordsArray = json.arrayValue("records")
for ( i in 0..recordsArray.length() - 1) {
val coreUserInfo = CoreUserInfo(recordsArray.getJSONObject(i))
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
return@map listItems
}.flatMap { loadCachedUserInfo(listItems, userIDIndex = userIDIndex) }
}
The user would initiate the sequence of events by calling:
ListController.itemsInList(list)
My questions about this code are:
loadCachedUserInfo
takes in an array of ListItem
and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo
associated with it. However, I need to continue passing the full array of ListItem
to the next method2.) Do I need to do additional work to support unsubscribing?
3.) This is similar question 1. My fetchUserInfoForListItems
takes an array of list items and returns an observable with that same array of list items after they have been fetched and re-run through the cache method. This also feels incorrect to me. I would rather this method return an Observable<List<UserInfo>>
for the objects that were fetched. I am not understanding how in itemsInList
to then associate the ListItem
s with the newly fetched UserInfo
and return an Observable of those ListItem
s.
Edit: After writing this post it gave helped me realize a few things. I can flatMap wrap my calls in an Observable.create that can contain the smarts I wanted to take out of my fetchUserInfoForListItems
, letting me address question #3. Here is the updated code:
fun itemsInList(list : ATList, parentValue : String? = null, searchString : String? = null, limit : Int = defaultFetchLimit, sortOrder: SortDescriptor? = null) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
val listItems = listItemsInList(list, parentValue = parentValue, searchString = searchString, limit = limit, sortOrder = sortOrder)
subscriber.onNext(listItems)
subscriber.onCompleted()
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap Observable.create<List<ATListItem>> { subscriber ->
fetchUserInfoForListItems(listItems, list.userIDIndex!!, force = false).map { userInfoList ->
for (coreUserInfo in userInfoList) {
coreUserMap[coreUserInfo.username.toLowerCase()] = coreUserInfo
coreUserMap[coreUserInfo.userID] = coreUserInfo
coreUserInfo.externalUserID?.let { coreUserMap[it] = coreUserInfo }
}
}.flatMap {
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}.subscribe {
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
}
return@flatMap Observable.just(listItems)
}
}
fun loadCachedUserInfo(listItems : List<ATListItem>, userIDIndex : Int) : Observable<List<ATListItem>> {
return Observable.create<List<ATListItem>> { subscriber ->
listItems.forEach { listItem -> listItem.coreUserInfo = coreUserMap[listItem.valueForAttributeIndex(userIDIndex)?.toLowerCase()] }
subscriber.onNext(listItems)
subscriber.onCompleted()
}
}
fun fetchUserInfoForListItems(listItems : List<ATListItem>, userIDIndex: Int, force: Boolean) : Observable<List<CoreUserInfo>> {
val itemsToFetch = if ( force ) listItems else listItems.filter { it.coreUserInfo == null }
val ids = itemsToFetch.map { it.valueForAttributeIndex(userIDIndex) ?: "" }.filter { !it.isEmpty() }
val records = hashMapOf("records" to ids)
if ( itemsToFetch.count() == 0 ) { return Observable.just(ArrayList<CoreUserInfo>()) }
return RuntimeDataController.dataService.fetchCoreUserInfo(recordsMap = records)
.map { json ->
val userInfo = ArrayList<CoreUserInfo>()
json.arrayValue("records").eachObject { userInfo.add(CoreUserInfo(it)) }
return@map userInfo
}
}
Upvotes: 3
Views: 1210
Reputation: 11870
- Currently loadCachedUserInfo takes in an array of ListItem and returns that same array as an observable after the cached items have been associated with it. This feels wrong to me. I think instead this call should only return the items that have a cached UserInfo associated with it. However, I need to continue passing the full array of ListItem to the next method
I am not sure I understand you correctly but if you only need side-effect (caching), you can just use doOnNext
. For example,
.doOnNext { listItems ->
if ( list.isUserList ) {
cache(listItems, userIDIndex = list.userIDIndex!!)
}
}
fun cache(listItems : List<ATListItem>, userIDIndex : Int) {
// caching
}
- Do I need to do additional work to support unsubscribing?
No, AFAIK.
Note:
More info about doOnNext
can be found at What is the purpose of doOnNext(...) in RxJava and here
Normally you don't need return@...
if the last statement in lambda is an expression.
e.g.:
.flatMap { listItems ->
if ( list.isUserList ) {
return@flatMap loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
}
return@flatMap Observable.just(listItems)
}
can be written like:
.flatMap { listItems ->
if ( list.isUserList )
loadCachedUserInfo(listItems, userIDIndex = list.userIDIndex!!)
else
Observable.just(listItems)
}
I didn't test the code.
Upvotes: 3