Reputation: 1630
I am attempting to upload multiple photos to a server using ReactiveX (RxSwift), gathering the responses from each request, and then making one final request to complete the submission.
Everything seems to be working fairly well until I attempt to reduce
all of the responses. The final subscribeNext
is never called. (Perhaps I misunderstand how flatMap
or reduce
works?)
Specifically, this is how I am attempting to perform this procedure.
Prepare an observable to encode each photo (self.imageMgr
is an instance of PHCachingImageManager()
)
func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> {
return create { observer in
self.imageMgr.requestImageForAsset(asset,
targetSize: PHImageManagerMaximumSize,
contentMode: .AspectFit,
options: nil,
resultHandler: { (myImage, myInfo) -> Void in
let data = UIImageJPEGRepresentation(myImage!, 1.0)!
NSLog("Encoded photo")
observer.onNext(data)
self.converts += 1
if self.converts == self.userReview.photos.count {
NSLog("Completed encoding photos")
observer.onCompleted()
}
})
return NopDisposable.instance
}
}
Prepare an observable to upload each photo once encoded (with Alamofire and RxAlamofire)
func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> {
return create { observer in
NSLog("Uploading Photo")
upload(.POST,
urlRequest.URLString,
headers: nil,
multipartFormData: { mfd in
mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg")
},
encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold,
encodingCompletion: { encodingResult in
switch encodingResult {
case .Success(let upload, _, _):
upload.responseJSON(completionHandler: { (myResponse) -> Void in
if let photoResponse = myResponse.result.value {
let photoObject = photoResponse.objectForKey("photo")!
let photo = ReviewPhotoObject()
photo.photoID = photoObject.objectForKey("id")! as! NSNumber
NSLog("Uploaded Photo")
observer.onNext(photo)
}
self.uploads += 1
if self.uploads == self.userReview.photos.count {
NSLog("Completed uploading photos")
observer.onCompleted()
}
})
case .Failure(let encodingError):
observer.onError(encodingError)
print(encodingError)
}
})
return NopDisposable.instance
}
}
Finally, put it all together
func postReview(review: MyReview) {
self.userReview = review
_ = review.photos.toObservable().flatMap { photos in
return self.getPhotoDataObservable(photos)
}.flatMap { photoData in
return self.getPostPhotoObservable(photoData)
}.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in
accumulator.append(Int(photo.photoID))
return accumulator
}).subscribeNext({ (photoIds) -> Void in
print(photoIds) // Never called
})
}
When run (with 2 photos for example), this is the output:
Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos
But subscribeNext
is never called. Since documentation on RxSwift specifically is still a little thin, I was hoping someone around here could clue me in on what I'm misunderstanding.
Upvotes: 4
Views: 1225
Reputation: 33967
The idea here is that once an observable is done sending all the elements it's going to send, it should complete. You are creating an observable for each PHAsset and that observable only sends one element so it should complete after that. The way you had the code, only the last one would complete, so the reduce
operator was just sitting around waiting for the rest to complete before it could finish its job.
Here is how I would have written the first function (In Swift 3 instead of 2.)
extension PHImageManager {
func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
return .create { observer in
let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
if let image = image {
observer.onNext(image)
observer.onCompleted()
}
else if let info = info, let error = info[PHImageErrorKey] as? Error {
observer.onError(error)
}
})
return Disposables.create { self.cancelImageRequest(request) }
}
}
}
You will see that I would have made it an extension on PHImageManager instead of a free function, but that's just a style difference. The functional differences are that my code will emit an error if the underlying request errors out, and will cancel the request if the subscribers all bail before the request completes. Also, it doesn't do the JPEG conversion. Keep these operations small and do the JPEG conversion inside a map like this:
let imagesData = review.photos.toObservable().flatMap {
self.imageMgr.requestMaximumSizeImage(for: $0)
}.map {
UIImageJPEGRepresentation($0, 1.0)
}.filter { $0 != nil }.map { $0! }
The above code requests the images from the manager, then converts them to JPEG data, filtering out any that failed the conversion. imagesData
is an Observable<Data>
type.
Your getPostPhotoObservable
is fine except for the completed issue, and the fact that it doesn't handle cancelation in the disposable. Also, you could just have your post function return an Observable instead of wrapping the result in a ReviewPhotoObject.
Other warnings:
The way you are putting it all together doesn't ensure that the ReviewPhotoObject
s will be in the same order as the photos going in (because you can't guarantee the order that the uploads will complete in.) To fix this, if necessary, you would need to use concat
instead of flatMap
.
If any of the uploads fail, the entire pipeline will disconnect and abort any subsequent uploads. You should probably set up something to catch the errors and do something appropriate. Either catchErrorJustReturn
or catchError
depending on your requirements.
Upvotes: 1