SwiftCoder
SwiftCoder

Reputation: 3

Limiting the number of fetch requests in URLSession with SwiftUI?

I'm using an async image loader to fetch images from a URLRequest, and I'm trying to wrap my code inside of an Operation so I can use .maxConcurrentOperationCount for an OperationQueue, because I'm supposed to limit the number of downloads to 3 at a time.

I've overriden the Operation class to try and support async downloads, however, I'm not able to achieve this, and I think it's because my downloading function is inside of a Task group.

The error i get is as follows:

Invalid conversion from 'async' function of type '(URL?, URLResponse?, (any Error)?) async throws -> Void' to synchronous function type '(URL?, URLResponse?, (any Error)?) -> Void'

Here are the code snippets:

for the overriden Operation class:


class DownloadOperation: Operation {
    private var task: URLSessionDataTask!
    
    init(session: URLSession, downloadTaskURL: URLRequest, completionHandler: ((URL?, URLResponse?, Error?) -> Void)?) {
           super.init()
           
           // use weak self to prevent retain cycle
           task = session.dataTask(with: downloadTaskURL, completionHandler: { [weak self] (URLRequest, response, error) in
               
   
               
              /*
                set the operation state to finished once
                the download task is completed or have error
              */
               self?.state = .finished
           })
       }
    
    enum OperationState : Int {
            case ready
            case executing
            case finished
        }

    private var state : OperationState = .ready {
          willSet {
              self.willChangeValue(forKey: "isExecuting")
              self.willChangeValue(forKey: "isFinished")
          }
          
          didSet {
              self.didChangeValue(forKey: "isExecuting")
              self.didChangeValue(forKey: "isFinished")
          }
      }
      
      override var isReady: Bool { return state == .ready }
      override var isExecuting: Bool { return state == .executing }
      override var isFinished: Bool { return state == .finished }
    
    
    override func start() {
         /*
         if the operation or queue got cancelled even
         before the operation has started, set the
         operation state to finished and return
         */
         if(self.isCancelled) {
             state = .finished
             return
         }
         
         // set the state to executing
         state = .executing
         
         print("downloading")
               
         // start the downloading
         self.task.resume()
     }

     override func cancel() {
         super.cancel()
       
         // cancel the downloading
         self.task.cancel()
     }
}

and here is me trying to use it inside of a task in the loader function:

  public func loadImage(_ urlRequest: URLRequest) async throws -> UIImage {
        if let status = images[urlRequest]{
            switch status{
            case .fetched(let image):
                return image
            case .inProgress(let task):
                return try await task.value
            case .failure(let error):
                self.hasError = true
                self.error = error as? InternetError
            }
        }
        
        
        let task: Task<UIImage, Error> = Task {
            do {
                let imageQueue = OperationQueue()
                imageQueue.maxConcurrentOperationCount = 3
                
                let operation = DownloadOperation(session: URLSession.shared, downloadTaskURL: urlRequest, completionHandler: {_, response ,_ in
                    let (imageData, response) = try await URLSession.shared.data(for: urlRequest)

                    guard let httpResponse = response as? HTTPURLResponse,
                          httpResponse.statusCode == 200 else {
                        throw InternetError.invalidServerResponse
                    }
                    guard let image = UIImage(data: imageData) else {
                        throw InternetError.noInternet
                    }
                    
                })
                imageQueue.addOperation(operation)
                
                
                
               // return image
            }
            catch {
                self.hasError = true
                images[urlRequest] = .failure(error)
                print("error caught in Loader")
                let image = UIImage(systemName: "wifi.exclamationmark")!
                return image
            }
        }
        
        do{
            images[urlRequest] = .inProgress(task)
            var image = try await task.value
            if let imageFromCache = imageCache.object(forKey: urlRequest as AnyObject) as? UIImage {
                image = imageFromCache
                return image
            }
            images[urlRequest] = .fetched(image)
            //storing image in cache
            imageCache.setObject(image, forKey: urlRequest as AnyObject)
            return image
        }
    }
}

I would appreciate any help about this! Thank you!!

Upvotes: 0

Views: 929

Answers (2)

Rob
Rob

Reputation: 438232

There are several issues:

  1. You are creating a new operation queue every time you call loadImage, rendering the maxConcurrentOperationCount moot. E.g., if you quickly request five images, you will end up with five operation queues, each with one operation on them, and they will run concurrently, with none of the five queues exceeding their respective maxConcurrentOperationCount.

    You must remove the local variable declaration of the operation queue from the function, and make it a property.

  2. DownloadOperation is starting a dataTask but not calling the completion handler. Also, when you create the DownloadOperation you are suppling a completion handler in which you are starting yet another download operation. If you are going to use an Operation to encapsulate the download, you should not have any URLSession code in the completion handler. Use the parameters returned.

  3. The asynchronous operation is not thread-safe. One must synchronize the access to this shared state variable.

Thus, perhaps:

var images: [URLRequest: ImageRequest] = [:]

let queue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3
    return queue
}()

let session: URLSession = .shared

public func loadImage(_ request: URLRequest) async throws -> UIImage {
    switch images[request] {
    case .fetched(let image):
        return image

    case .inProgress(let task):
        return try await task.value

    case .failure(let error):
        throw error

    case nil:
        let task: Task<UIImage, Error> = Task {
            try await withCheckedThrowingContinuation { continuation in
                let operation = ImageRequestOperation(session: session, request: request) { [weak self] result in
                    DispatchQueue.main.async {
                        switch result {
                        case .failure(let error):
                            self?.images[request] = .failure(error)
                            continuation.resume(throwing: error)

                        case .success(let image):
                            self?.images[request] = .fetched(image)
                            continuation.resume(returning: image)
                        }
                    }
                }

                queue.addOperation(operation)
            }
        }

        images[request] = .inProgress(task)

        return try await task.value
    }
}

Where the above, the async-await code, uses the following operation:

class ImageRequestOperation: DataRequestOperation {
    init(session: URLSession, request: URLRequest, completionHandler: @escaping (Result<UIImage, Error>) -> Void) {
        super.init(session: session, request: request) { result in
            switch result {
            case .failure(let error):
                DispatchQueue.main.async { completionHandler(.failure(error)) }

            case .success(let data):
                guard let image = UIImage(data: data) else {
                    DispatchQueue.main.async { completionHandler(.failure(URLError(.badServerResponse))) }
                    return
                }

                DispatchQueue.main.async { completionHandler(.success(image)) }
            }
        }
    }
}

The the above abstracts the image-related part, above, from the network-related stuff, below. Thus:

class DataRequestOperation: AsynchronousOperation {
    private var task: URLSessionDataTask!

    init(session: URLSession, request: URLRequest, completionHandler: @escaping (Result<Data, Error>) -> Void) {
        super.init()

        task = session.dataTask(with: request) { data, response, error in
            guard
                let data = data,
                let response = response as? HTTPURLResponse,
                200 ..< 300 ~= response.statusCode
            else {
                completionHandler(.failure(error ?? URLError(.badServerResponse)))
                return
            }

            completionHandler(.success(data))

            self.finish()
        }
    }

    override func main() {
        task.resume()
    }

    override func cancel() {
        super.cancel()

        task.cancel()
    }
}

And the above inherits from an AsynchronousOperation that abstracts all of your asynchronous operation stuff, below, from the substance of what the operation does, above. Thus:

/// AsynchronousOperation
///
/// Encapsulate the basic asynchronous operation logic in its own class, to avoid cluttering
/// your concrete implementations with a ton of boilerplate code.

class AsynchronousOperation: Operation {
    enum OperationState: Int {
        case ready
        case executing
        case finished
    }

    @Atomic var state: OperationState = .ready {
        willSet {
            willChangeValue(forKey: #keyPath(isExecuting))
            willChangeValue(forKey: #keyPath(isFinished))
        }

        didSet {
            didChangeValue(forKey: #keyPath(isFinished))
            didChangeValue(forKey: #keyPath(isExecuting))
        }
    }

    override var isReady: Bool        { state == .ready && super.isReady }
    override var isExecuting: Bool    { state == .executing }
    override var isFinished: Bool     { state == .finished }
    override var isAsynchronous: Bool { true }

    override func start() {
        if isCancelled {
            state = .finished
            return
        }

        state = .executing

        main()
    }

    /// Subclasses should override this method, but *not* call this `super` rendition.

    override func main() {
        assertionFailure("The `main` method should be overridden in concrete subclasses of this abstract class.")
    }

    func finish() {
        state = .finished
    }
}

And, note, that I addressed the lack of thread-safe access to the state using this property wrapper:

/// Atomic
///
/// Property wrapper providing atomic interface.
///
/// - Note: It is advised to use this with value types only. If you use reference types, the object could theoretically be mutated beyone the knowledge of this property wrapper, losing atomic behavior.

@propertyWrapper
struct Atomic<T> {
    private var _wrappedValue: T
    private let lock = NSLock()

    var wrappedValue: T {
        get { lock.withLock { _wrappedValue } }
        set { lock.withLock { _wrappedValue = newValue } }
    }

    init(wrappedValue: T) {
        _wrappedValue = wrappedValue
    }
}

This yields asynchronous behaviors with max concurrency count of 3. E.g., here I download 10 images, then another 10, and then another 20:

enter image description here

Upvotes: 1

altimes
altimes

Reputation: 430

An alternative using async functions that I have used for a similar problem of having an array of requests and not wanting to flood the server is to use, in pseudo code,

let maxTasks = 20 // at most 20 concurrent tasks
let mytasks = [MyTasks] // array of data to be used in/as task
var next: Int  // counter to check for all tasks done

let startingCount = min(maxTasks, mytasks.count)
await withTaskGroup(of: MyResult.self) { group in
// start with limited number of tasks
 for index in 0 ..< startingCount {
   group.addTask { 
     myResult = mytasks[index]
     return myResult
   }
 }
 next = startingCount

 for await item in group {
  process(item)  // handle the result
  // as each task from the starting group completes and there is more to be run,
  // then add another
  if next < mytasks.count {
    group.addTask { 
      myResult = myTask[next]
      return myResult
    }
    next += 1
 }
}

The approach being to start some fixed number of concurrent tasks using withTaskGroup, then wait on the results. As each result comes in, take the next task off the list add add it to the current group. The concurrency count hopefully stays below the maxTasks limit. There may be issues with this approach that I am not aware of, but it seems to work OK so far.

FWIW Alan.

Upvotes: 0

Related Questions