Reputation: 2114
I’m currently migrating my app to use the concurrency model in Swift. I want to serialize Tasks to make sure they are executed one after the other (no paralellism). In my use case, I want to listen to notifications posted by the NotificationCenter and execute a Task every time a new notification is posted. But I want to make sure no previous task is running. It's the equivalent of using an OperationQueue with maxConcurrentOperationCount = 1.
For example, I’m using CloudKit with Core Data in my app and I use persistent history tracking to determine what changes have occurred in the store. In this Synchronizing a Local Store to the Cloud Sample Code, Apple uses an operation queue for handling history processing tasks (in CoreDataStack). This OperationQueue has a maximum number of operations set to 1.
private lazy var historyQueue: OperationQueue = {
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
return queue
}()
When a Core Data notification is received, a new task is added to this serial operation queue. So if many notifications are received, they will all be performed one after the other one in a serial way.
@objc
func storeRemoteChange(_ notification: Notification) {
// Process persistent history to merge changes from other coordinators.
historyQueue.addOperation {
self.processPersistentHistory()
}
}
In this Loading and Displaying a Large Data Feed Sample Code, Apple uses Tasks to handle history changes (in QuakesProvider).
// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
Task {
await self.fetchPersistentHistory()
}
}
I feel something is wrong in the second project as Tasks could happen in any order, and not necessarily in a serial order (contrary to the first project where the OperationQueue as a maxConcurrentOperationCount = 1).
Should we use an actor somewhere to make sure the methods are serially called?
I thought about an implementation like this but I’m not yet really comfortable with that:
actor PersistenceStoreListener {
let historyTokenManager: PersistenceHistoryTokenManager = .init()
private let persistentContainer: NSPersistentContainer
init(persistentContainer: NSPersistentContainer) {
self.persistentContainer = persistentContainer
}
func processRemoteStoreChange() async {
print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
}
}
where the processRemoteStoreChange method would be called by when a new notification is received (AsyncSequence):
notificationListenerTask = Task {
let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
for await _ in notifications {
print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
await self.storeListener?.processRemoteStoreChange()
}
}
Upvotes: 11
Views: 5130
Reputation: 437917
Below, in my original answer, I answer the general question of how to achieve sequential behavior from independent tasks within Swift concurrency.
But, you are asking a more specific question, namely, how to get serial behavior from an asynchronous sequence of events. If you have an AsyncSequence
, such as notifications
, then the for
-await
-in
approach you contemplate at the end of your answer is a great solution:
notificationListenerTask = Task {
let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
for await _ in notifications {
await self.storeListener?.processRemoteStoreChange()
}
}
Because you await
within the loop, it will not get to the next iteration of the notifications
AsyncSequence
until the prior processRemoteStoreChange
returns and execution of the loop continues.
Bottom line, AsyncSequence
(whether notifications
or your own AsyncStream
or AsyncChannel
) are an excellent way to get serial behavior from an asynchronous series of events. WWDC 2021 video Meet AsyncSequence is a great primer on asynchronous sequences for those unfamiliar with the AsyncSequence
protocol.
In my original answer, below, I tackle the more general question of getting serial behavior from a series of independent Swift concurrency tasks:
If you want to get the behavior of an OperationQueue
with a maxConcurrentOperationCount
of 1
(a ”serial” operation queue), one can achieve that with an actor
.
There are two patterns that you will see with a serial OperationQueue
:
The operations in the queue are, themselves, synchronous.
If you are using the standard OperationQueue
(i.e., you have not subclassed Operation
that does manual KVO for isFinished
, etc.), a simple actor
achieves what we want. An actor will prevent concurrent execution.
The key here, though, that this only works with synchronous methods (i.e., those methods that do not have await
suspension points).
The operations in the queue are asynchronous.
One of the more advanced use-cases of operation queues is to handle dependencies between tasks that are, themselves, asynchronous. This is a more complicated scenario in operation queues, requiring a custom Operation
subclass in which you manually handle the KVO of isFinished
, etc. (See this answer for an example of that pattern.)
The challenge in doing this with Swift concurrency is that actors are reentrant (see reentrancy discussion in SE-0306. If the actor’s method is asynchronous (i.e., with async
-await
) that introduces suspension points, i.e., where an await
in one call will allow another async
method to run on that actor.
To achieve serial execution between separate async
methods, you have a couple of options:
await
the prior Task
; orAsyncSequence
such as an AsyncStream
or AsyncChannel
(see https://stackoverflow.com/a/75730483/1271826).Consider the following (which uses OS signposts so that I can graphically illustrate the behavior in Instruments):
import os.log
private let pointsOfInterest = OSSignposter(subsystem: "log", category: .pointsOfInterest)
class ViewController: UIViewController {
private let example = Example()
private let taskSerializer = SerialTasks<Void>()
@IBAction func didTapSync(_ sender: Any) {
let state = pointsOfInterest.beginInterval(#function, id: pointsOfInterest.makeSignpostID())
startSynchronous()
pointsOfInterest.endInterval(#function, state)
}
@IBAction func didTapAsync(_ sender: Any) {
pointsOfInterest.emitEvent(#function)
Task { try await startAsynchronous() }
}
@IBAction func didTapSerializedAsync(_ sender: Any) {
pointsOfInterest.emitEvent(#function)
Task { try await startSerializedAsynchronous() }
}
func startSynchronous() {
Task {
await example.synchronousExample("1. synchronous")
}
}
func startAsynchronous() async throws {
try await example.asynchronousExample("2. asynchronous")
}
func startSerializedAsynchronous() async throws {
try await taskSerializer.add {
try await self.example.asynchronousExample("3. serial async")
}
}
}
actor Example {
func asynchronousExample(_ name: StaticString) async throws {
let state = pointsOfInterest.beginInterval(name, id: pointsOfInterest.makeSignpostID())
defer { pointsOfInterest.endInterval(name, state) }
try await Task.sleep(for: .seconds(2))
}
func synchronousExample(_ name: StaticString) {
let state = pointsOfInterest.beginInterval(name, id: pointsOfInterest.makeSignpostID())
defer { pointsOfInterest.endInterval(name, state) }
Thread.sleep(forTimeInterval: 2)
}
}
actor SerialTasks<Success: Sendable> {
private var previousTask: Task<Success, any Error>?
func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
let task = Task { [previousTask] in
let _ = await previousTask?.result
return try await block()
}
previousTask = task
return try await withTaskCancellationHandler {
try await task.value
} onCancel: {
task.cancel()
}
}
}
With synchronous tasks (scenario 1), startSynchronous
, is the simplest. Just call the synchronous method of the actor and you get serial execution.
With asynchronous tasks (scenario 2), startAsynchronous
, if you have await
suspension points, you lose sequential behaviors due to actor reentrancy.
But you can refine that asynchronous task pattern (scenario 3), by having an actor, SerialTasks
in the above code, that keeps track of the previous task, awaiting it before starting the next task. A subtle point is that the add
method is, itself, synchronous (although the closure it takes is asynchronous). This avoids subtle races if you add multiple tasks.
Running the above in Instruments, we can graphically see the execution, with ⓢ
signposts where tasks were initiated, and intervals showing when the tasks execute:
In short, if your actor
is performing only synchronous tasks (which is your case), then the actor
yields maxConcurrentOperationCount = 1
sort of behavior automatically. If the tasks are asynchronous, you simply need to await
the prior tasks before starting the next one.
Upvotes: 19