Rob
Rob

Reputation: 437917

How to constrain concurrency (like `maxConcurrentOperationCount`) with Swift Concurrency?

I am trying to perform a series of network requests and would like to limit the number of concurrent tasks in the new Swift Concurrency system. With operation queues we would use maxConcurrentOperationCount. In Combine, flatMap(maxPublishers:_:). What is the equivalent in the new Swift Concurrency system?

E.g., it is not terribly relevant, but consider:

func downloadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        for index in 0..<20 {
            group.addTask { try await self.download(index) }
        }

        try await group.waitForAll()
    }
}

That results in all the requests running concurrently:

enter image description here

The fact that URLSession is not honoring httpMaximumConnectionsPerHost is interesting, but not the salient issue here. I am looking for, more generally, how to constrain the degree of concurrency in a series of asynchronous tasks running in parallel.

Upvotes: 25

Views: 4634

Answers (1)

Rob
Rob

Reputation: 437917

One can insert a group.next() call inside the loop after reaching a certain count, e.g.:

func downloadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        for index in 0..<20 {
            if index >= 6 { try await group.next() }
            group.addTask { try await self.download(index) }
        }

        try await group.waitForAll()
    }
}

That results in no more than six at a time:

enter image description here


For the sake of completeness, I should note that in WWDC 2023 Beyond the basics of structured concurrency, Apple suggests an alternative pattern:

withTaskGroup(of: Something.self) { group in
    for _ in 0 ..< maxConcurrentTasks {
        group.addTask { … }
    }
    while let <partial result> = await group.next() {
        if !shouldStop {
            group.addTask { … }
        }
    }
}

Which, in this example, might translate to:

func downloadAll() async throws {
    try await withThrowingTaskGroup(of: Void.self) { group in
        for index in 0..<6 {
            group.addTask { try await self.download(index) }
        }
        var index = 6
        while try await group.next() != nil {
            if index < 20 {
                group.addTask { [index] in try await self.download(index) }
            }
            index += 1
        }
    }
}

Yielding (in Instruments):

enter image description here

The idea is very similar, namely that you group.addTask {…} up to the max desired concurrency, but then group.next() before adding each subsequent task. It is another way to crack the nut.

Upvotes: 40

Related Questions