Tamas
Tamas

Reputation: 3442

How to make an async Swift function "@synchronized"?

I'd like to create an async function which itself using async calls. I also want to ensure that only one call is actively processed in any moment. So I want an async @synchronized function.

How to do that? Wrapping the function's body inside the dispatchQueue.sync {} does not work as it expects synchronised code. Also it seems that DispatchQueue in general is not designed to have async code blocks / tasks to execute.

This code communicates with hardware, so async in nature, that's why I want an async interface for my library. (I don't want to block the app while the stages of communication happen.) But certain operations can't be executed parallel on the hardware, so I have to go through synchronisation so the certain operations won't happen at the same time.

Upvotes: 3

Views: 4261

Answers (2)

Rob
Rob

Reputation: 437592

My original answer (in which I suggest awaiting the prior task) is below. It is a simple pattern that works well, but is unstructured concurrency (complicating cancelation workflows).

Nowadays, I would use an AsyncSequence, e.g., an AsyncStream. See WWDC 2021 video Meet AsyncSequence. Or, for queue-like behavior (where we set up the queue initially and later append items to it), more often than not, I reach for AsyncChannel from the Swift Async Algorithms package. See WWDC 2022 video Meet Swift Async Algorithms.

E.g., I can create an AsyncChannel for URLs that I want to download:

let urls = AsyncChannel<URL>()

Now that I have a channel, I can set up a task to process them serially with a for-await-in loop:

func processUrls() async {
    for await url in urls {
        await download(url)
    }
}

And, when I later want to add something to that channel to be processed, I can send to that channel:

func append(_ url: URL) async {
    await urls.send(url)
}

You can have every Task await the prior one. And you can use actor make sure that you are only running one at a time. The trick is, because of actor reentrancy, you have to put that "await prior Task" logic in a synchronous method.

E.g., you can do:

actor Experiment {
    private var previousTask: Task<Void, Error>?

    func startSomethingAsynchronous() {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            try await self.doSomethingAsynchronous()
        }
    }

    private func doSomethingAsynchronous() async throws {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
        try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
        os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
    }
}

Now I am using os_signpost so I can watch this serial behavior from Xcode Instruments. Anyway, you could start three tasks like so:

import os.log

private let log = OSLog(subsystem: "Experiment", category: .pointsOfInterest)

class ViewController: NSViewController {

    let experiment = Experiment()

    func startExperiment() {
        for _ in 0 ..< 3 {
            Task { await experiment.startSomethingAsynchronous() }
        }
        os_signpost(.event, log: log, name: "Done starting tasks")
    }

    ... 
}

And Instruments can visually demonstrate the sequential behavior (where the shows us where the submitting of all the tasks finished), but you can see the sequential execution of the tasks on the timeline:

enter image description here


I actually like to abstract this serial behavior into its own type:

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) {
        previousTask = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
    }
}

And then the asynchronous function for which you need this serial behavior would use the above, e.g.:

class Experiment {
    let serialTasks = SerialTasks<Void>()

    func startSomethingAsynchronous() async {
        await serialTasks.add {
            try await self.doSomethingAsynchronous()
        }
    }

    private func doSomethingAsynchronous() async throws {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Task", signpostID: id, "Start")
        try await Task.sleep(nanoseconds: 2 * NSEC_PER_SEC)
        os_signpost(.end, log: log, name: "Task", signpostID: id, "End")
    }
}

Upvotes: 7

CrossProduct
CrossProduct

Reputation: 131

The accepted answer does indeed make tasks run serially but it does not wait for a task to be finished and does not propagate errors. I use the following alternative to support the catching of errors.

actor SerialTaskQueue<T> {
    private let dispatchQueue: DispatchQueue

    init(label: String) {
        dispatchQueue = DispatchQueue(label: label)
    }
    
    @discardableResult
    func add(block: @Sendable @escaping () async throws -> T) async throws -> T {
        try await withCheckedThrowingContinuation { continuation in
            dispatchQueue.sync {
                let semaphore = DispatchSemaphore(value: 0)

                Task {
                    defer {
                        semaphore.signal()
                    }

                    do {
                        let result = try await block()

                        continuation.resume(returning: result)
                    } catch {
                        continuation.resume(throwing: error)
                    }
                }

                semaphore.wait()
            }
        }
    }
}

Upvotes: -2

Related Questions