ricardopereira
ricardopereira

Reputation: 11683

RxSwift: use a combination of operators to download a photo and save it locally

I'm trying to achieve a reactive way to perform some operations:

  1. Request a photo download
  2. Get the download progress from next events
  3. When completed then save that photo locally

So I started with RxSwift and implemented it like

photoController.downloadPhoto(photoItem.photo)
.doOnNext { downloadTaskInfo in
    photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
}
.flatMapLatest { downloadTaskInfo in
    return PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
}
.observeOn(MainScheduler.instance)
.subscribe(
    onError: { error in
        photoItem.viewState = .NetworkFailed
    },
    onCompleted: {
        photoItem.viewState = .Default
    }
)
.addDisposableTo(disposeBag)

but the flatMapLatest doesn't do what I was expecting. I thought that flatMapLatest would allow me to grab the latest event and make another operation.

So, I decided to replace it with reduce to achieve what I had in mind but I think it's not the right operator because I don't want to join all the download progress in one variable. What I want is something where it's possible to wait for the download to complete and then get the latest to continue with other operations like saving the photo locally. With concat I cannot receive the result from the first Observable.

I need something like

// 😅
.waitUntilDownloadFinishesAndContinueWith { downloadTaskInfo in
    return PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
}

Can someone explain me the right way to design this?

UPDATE

I decided to go with withLatestFrom but even so I'm having some problems. The downloadPhotoObservable is being disposed too soon.

let downloadPhotoObservable = photoController.downloadPhoto(photoItem.photo)
    .doOnNext { downloadTaskInfo in
        photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
    }

Observable.just(photoItem)
    .withLatestFrom(downloadPhotoObservable)
    .map { downloadTaskInfo in
        PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(
        onError: { error in
            photoItem.viewState = .NetworkFailed
        },
        onCompleted: {
            photoItem.viewState = .Default
        }
    )
    .addDisposableTo(disposeBag)

I'm doing something wrong for sure.

Upvotes: 3

Views: 1196

Answers (1)

ricardopereira
ricardopereira

Reputation: 11683

So, I found a way to achieve what I was trying to do. I decided to filter all the results and compare the final buffer length. The buffer is the next part for the photo persistence.

photoController.downloadPhoto(photoItem.photo)
    .downloadProgress()
    // Receive the download progress
    .doOnNext { downloadTaskInfo in
        photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
    }
    // Wait for the complete buffer
    .filter { downloadTaskInfo in
        downloadTaskInfo.contentLength == Int64(downloadTaskInfo.buffer.length)
    }
    // Save it locally
    .flatMap { downloadTaskInfo in
        PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(
        onError: { error in
            photoItem.viewState = .NetworkFailed
        },
        onCompleted: {
            photoItem.viewState = .Default
        }
    )
    .addDisposableTo(disposeBag)

BTW, I'm using a scan operator to recall the progress info. I created a shortcut with a custom operator called downloadProgress:

extension ObservableType where E == NetworkDataTaskInfo {
    func downloadProgress() -> Observable<NetworkDownloadTaskInfo> {
        let seed = NetworkDownloadTaskInfo(task: NopNetworkTask(), buffer: NSMutableData(), progress: 0, contentLength: 0)
        return scan(seed, accumulator: { latestDownloadTaskInfo, currentDataTaskInfo in
            var downloadedProgress: Float = 0
            var contentLength: Int64 = 0

            if let response = currentDataTaskInfo.response {
                // Start
                contentLength = response.expectedContentLength
            }
            else if let data = currentDataTaskInfo.data {
                // Accumulate
                contentLength = latestDownloadTaskInfo.contentLength
                latestDownloadTaskInfo.buffer.appendData(data)
                downloadedProgress = Float(latestDownloadTaskInfo.buffer.length) / Float(contentLength)
            }

            if contentLength <= 0 {
                throw NSURLError.ZeroByteResource
            }

            // Accumulated info
            return NetworkDownloadTaskInfo(
                task: currentDataTaskInfo.task,
                buffer: latestDownloadTaskInfo.buffer,
                progress: downloadedProgress,
                contentLength: contentLength
            )
        })
    }
}

Upvotes: 1

Related Questions