Reputation: 3442
I'd like to create an async function which itself using async calls. I also want to ensure that only one call is actively processed in any moment. So I want an async
@synchronized
function.
How to do that? Wrapping the function's body inside the dispatchQueue.sync {}
does not work as it expects synchronised code. Also it seems that DispatchQueue
in general is not designed to have async code blocks / tasks to execute.
This code communicates with hardware, so async in nature, that's why I want an async interface for my library. (I don't want to block the app while the stages of communication happen.) But certain operations can't be executed parallel on the hardware, so I have to go through synchronisation so the certain operations won't happen at the same time.
Upvotes: 3
Views: 4261
Reputation: 437592
My original answer (in which I suggest awaiting the prior task) is below. It is a simple pattern that works well, but is unstructured concurrency (complicating cancelation workflows).
Nowadays, I would use an AsyncSequence
, e.g., an AsyncStream
. See WWDC 2021 video Meet AsyncSequence. Or, for queue-like behavior (where we set up the queue initially and later append items to it), more often than not, I reach for AsyncChannel
from the Swift Async Algorithms package. See WWDC 2022 video Meet Swift Async Algorithms.
E.g., I can create an AsyncChannel
for URLs that I want to download:
let urls = AsyncChannel<URL>()
Now that I have a channel, I can set up a task to process them serially with a for
-await
-in
loop:
func processUrls() async {
for await url in urls {
await download(url)
}
}
And, when I later want to add something to that channel to be processed, I can send
to that channel:
func append(_ url: URL) async {
await urls.send(url)
}
You can have every Task
await the prior one. And you can use actor make sure that you are only running one at a time. The trick is, because of actor reentrancy, you have to put that "await prior Task
" logic in a synchronous method.
E.g., you can do:
actor Experiment {
private var previousTask: Task<Void, Error>?
func startSomethingAsynchronous() {
previousTask = Task { [previousTask] in
let _ = await previousTask?.result
try await self.doSomethingAsynchronous()
}
}
private func doSomethingAsynchronous() async throws {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
}
}
Now I am using os_signpost
so I can watch this serial behavior from Xcode Instruments. Anyway, you could start three tasks like so:
import os.log
private let log = OSLog(subsystem: "Experiment", category: .pointsOfInterest)
class ViewController: NSViewController {
let experiment = Experiment()
func startExperiment() {
for _ in 0 ..< 3 {
Task { await experiment.startSomethingAsynchronous() }
}
os_signpost(.event, log: log, name: "Done starting tasks")
}
...
}
And Instruments can visually demonstrate the sequential behavior (where the ⓢ
shows us where the submitting of all the tasks finished), but you can see the sequential execution of the tasks on the timeline:
I actually like to abstract this serial behavior into its own type:
actor SerialTasks<Success> {
private var previousTask: Task<Success, Error>?
func add(block: @Sendable @escaping () async throws -> Success) {
previousTask = Task { [previousTask] in
let _ = await previousTask?.result
return try await block()
}
}
}
And then the asynchronous function for which you need this serial behavior would use the above, e.g.:
class Experiment {
let serialTasks = SerialTasks<Void>()
func startSomethingAsynchronous() async {
await serialTasks.add {
try await self.doSomethingAsynchronous()
}
}
private func doSomethingAsynchronous() async throws {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
}
}
Upvotes: 7
Reputation: 131
The accepted answer does indeed make tasks run serially but it does not wait for a task to be finished and does not propagate errors. I use the following alternative to support the catching of errors.
actor SerialTaskQueue<T> {
private let dispatchQueue: DispatchQueue
init(label: String) {
dispatchQueue = DispatchQueue(label: label)
}
@discardableResult
func add(block: @Sendable @escaping () async throws -> T) async throws -> T {
try await withCheckedThrowingContinuation { continuation in
dispatchQueue.sync {
let semaphore = DispatchSemaphore(value: 0)
Task {
defer {
semaphore.signal()
}
do {
let result = try await block()
continuation.resume(returning: result)
} catch {
continuation.resume(throwing: error)
}
}
semaphore.wait()
}
}
}
}
Upvotes: -2