Reputation: 1365
I ran into a situation where I would fetch an API which will generate json data of registered users. I would then have to loop through each user and fetch their avatar from remote url and save it to disk. I can perform this second task inside subscribe
but this is not a best practice. I am trying to implement it with map
, flatMap
etc.
Here is my sample code:
self.dataManager.getUsers()
.observeOn(MainScheduler.instance)
.subscribeOn(globalScheduler)
.map{ [unowned self] (data) -> Users in
var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars
if let cats = users.categories {
for cat in cats {
if let profiles = cat.profiles {
for profile in profiles {
if let thumbnail = profile.thumbnail,
let url = URL(string: thumbnail) {
URLSession.shared.rx.response(request: URLRequest(url: url))
.subscribeOn(MainScheduler.instance)
.subscribe(onNext: { response in
// Update Image
if let img = UIImage(data: response.data) {
try? Disk.save(img, to: .caches, as: url.lastPathComponent)
}
}, onError: { (error) in
}).disposed(by: self.disposeBag)
}
}
}
}
}
return users
}
.subscribe(onSuccess: { [weak self] (users) in
}).disposed(by: disposeBag)
There are 2 problems in this code. First is with the rx
on URLSession
which execute the task in background on another thread and there is no way to acknowledge the main subscribe
back when this operation will finish. Second is with the loop and rx which is not efficient as it should generate multiple observables and then process it.
Any idea to improve this logic is welcome.
Upvotes: 2
Views: 2337
Reputation: 33967
This was a fun puzzle.
The "special sauce" that solves the problem is in this line:
.flatMap {
Observable.combineLatest($0.map {
Observable.combineLatest(
Observable.just($0.0),
URLSession.shared.rx.data(request: $0.1)
.materialize()
)
})
}
The map
before the line creates an Observable<[(URL, URLRequest)]>
and the line in question converts it to an Observable<[(URL, Event<Data>)]>
.
The line does this by:
Observable<Data>
Observable<Event<Data>>
(this is done so an error in one download won't shutdown the entire stream.)Observable<URL>
Observable<(URL, Event<Data>)>
.[Observable<(URL, Event<Data>)>]
Observable<[(URL, Event<Data>)]>
Here is the code
// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
.map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
.asObservable()
.share(replay: 1)
// this chain is for handling the users object. You left it blank in your code so I did too.
users
.observeOn(MainScheduler.instance)
.subscribe(onNext: { users in
})
.disposed(by: disposeBag)
// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { $0.categories ?? [] }
.map { $0.flatMap { $0.profiles ?? [] } }
.map { $0.compactMap { $0.thumbnail } }
.map { $0.compactMap { URL(string: $0) } }
.map { $0.map { ($0, URLRequest(url: $0)) } }
.flatMap {
Observable.combineLatest($0.map {
Observable.combineLatest(
Observable.just($0.0),
URLSession.shared.rx.data(request: $0.1)
.materialize()
)
})
}
.flatMap { Observable.from($0) }
.share(replay: 1)
// this chain filters out the errors and saves the successful downloads.
images
.filter { $0.1.element != nil }
.map { ($0.0, $0.1.element!) }
.map { ($0.0, UIImage(data: $0.1)!) }
.observeOn(MainScheduler.instance)
.bind(onNext: { url, image in
try? Disk.save(image, to: .caches, as: url.lastPathComponent)
return // need two lines here because this needs to return Void, not Void?
})
.disposed(by: disposeBag)
// this chain handles the download errors if you want to.
images
.filter { $0.1.error != nil }
.bind(onNext: { url, error in
print("failed to download \(url) because of \(error)")
})
.disposed(by: disposeBag)
Upvotes: 3