zaphoyd
zaphoyd

Reputation: 2750

Integrate a blocking function into Swift async

I understand that an asynchronous Swift Task is not supposed to block (async worker threads must always make forward progress). If I have an otherwise 100% async Swift application but need to introduce some blocking tasks, what is the correct way to do this that will not block any of the swift async thread pool workers?

I'm assuming a new dedicated thread outside of the async thread pool is required, if that assumption is correct what is the thread safe way for an async function to await for that thread to complete? Can I use the body of withCheckedContinuation to launch a thread, copy the continuation handle into that thread and call continuation.resume from that thread when it completes?

Upvotes: 6

Views: 5450

Answers (2)

Rob
Rob

Reputation: 437592

You said:

I'm assuming a new dedicated thread outside of the async thread pool is required …

I would not jump to that conclusion. It depends entirely upon the nature of this “blocking task”.

  1. If it simply is some slow, synchronous task (e.g. a CPU-intensive task), then you can stay within the Swift concurrency system and perform this synchronous task within a detached task or an actor. If you do this, though, you must periodically Task.yield() within this task in order to ensure that you fulfill the Swift concurrency contract to “ensure forward progress”.

  2. If it is some blocking API that will wait/sleep on that thread, then, as Itai suggested, we would wrap it in a continuation, ideally, replacing that blocking API with a non-blocking one. If it is not practical to replace the blocking API with an asynchronous rendition, then, yes, you could spin up your own thread for that, effectively making it an asynchronous task, and then wrapping that within a continuation.


But, in the context of long-running, blocking, computations, if one cannot periodically yield, SE-0296 - Async/await says that one should run it in a “separate context” (emphasis added):

Because potential suspension points can only appear at points explicitly marked within an asynchronous function, long computations can still block threads. This might happen when calling a synchronous function that just does a lot of work, or when encountering a particularly intense computational loop written directly in an asynchronous function. In either case, the thread cannot interleave code while these computations are running, which is usually the right choice for correctness, but can also become a scalability problem. Asynchronous programs that need to do intense computation should generally run it in a separate context. When that’s not feasible, there will be library facilities to artificially suspend and allow other operations to be interleaved.

Re long computations that do not have that capability, the proposal suggests that one “run it in a separate context”. For example, in WWDC 2022’s Visualize and optimize Swift concurrency, Apple explicitly advises moving the blocking code out of the Swift concurrency system:

Be sure to avoid blocking calls in tasks. … If you have code that needs to do these things, move that code outside of the concurrency thread pool – for example, by running it on a DispatchQueue – and bridge it to the concurrency world using continuations.

So, for example, you can do the following (assuming if Value is Sendable):

func foo() async -> Value {
    await withCheckedContinuation { continuation in
        DispatchQueue.global(qos: .utility).async {
            let value = slowAndSynchronous()
            continuation.resume(returning: value)
        }
    }
}

Alternatively, nowadays you can get it out of the Swift concurrency cooperative thread pool using an actor with a custom executor:

actor Foo {
    private let queue = DispatchSerialQueue(label: Bundle.main.bundleIdentifier! + ".Foo", qos: .utility)

    nonisolated var unownedExecutor: UnownedSerialExecutor {
        queue.asUnownedSerialExecutor()
    }

    func foo() -> Value {
        // you can confirm that you are not in the cooperative thread pool
        //
        // dispatchPrecondition(condition: .onQueue(queue))

        return slowAndSynchronous()
    }
}

Bottom line, if the synchronous function cannot periodic yield, to allow interleaving, move this code outside of the cooperative thread pool via one of the mechanisms outlined above.

Just be aware that Swift concurrency cannot reason about other threads outside of its scope and will continue to fully avail itself of the cooperative thread pool leading to a potential over-commit of the CPU (one of the problems the cooperative thread pool was designed to solve). This is suboptimal, so some care is needed, but it may be a necessary evil if you are using Swift concurrency in conjunction with blocking or computationally intensive library designed without the cooperative thread pool in mind.

Upvotes: 2

Itai Ferber
Itai Ferber

Reputation: 29833

I understand that an asyncronous Swift Task is not supposed to block (async worker threads must always make forward progress).

This is correct. The cornerstone of the Swift concurrency system is that tasks must always be making forward progress.

Can I use the body of withCheckedContinuation to launch a thread, copy the continuation handle into that thread and call continuation.resume from that thread when it completes?

Yes, this is also correct, and is exactly the purpose of continuations:

CheckedContinuation
A mechanism to interface between synchronous and asynchronous code, logging correctness violations.

The purpose of a continuation is to allow you to fit blocking synchronous operations into the async world. When you call withCheckedContinuation, it

[s]uspends the current task, then calls the given closure with a checked continuation for the current task.

The task is suspended indefinitely until you resume it, which allows other tasks to make progress in the meantime. The continuation value you get is a thread-safe interface to indicate that your blocking operation is done, and that the original task should resume at the next available opportunity. The continuation is also Sendable, which indicates that you can safely pass it between threads. Any thread is allowed to resume the task, so you don't even necessarily need to call back to the continuation on the same thread.

An example usage from SE-0300: Continuations for interfacing async tasks with synchronous code:

func operation() async -> OperationResult {
  // Suspend the current task, and pass its continuation into a closure
  // that executes immediately
  return await withUnsafeContinuation { continuation in
    // Invoke the synchronous callback-based API...
    beginOperation(completion: { result in
      // ...and resume the continuation when the callback is invoked
      continuation.resume(returning: result)
    }) 
  }
}

Note that this is only necessary for tasks which are truly blocking, and cannot make further progress until something they depend on is done. This is different from tasks which perform active computation which just happen to take a long time, as those are tasks which are at least making active progress. (But in chat, you clarify that your use-case is the former.)

Upvotes: 6

Related Questions