Reputation: 36088
I have an actor
which throttles requests in a way where the first one will suspend subsequent requests until finished, then share its response with them so they don't have to make the same request.
Here's what I'm trying to do:
let cache = Cache()
let operation = OperationStatus()
func execute() async {
if await operation.isExecuting else {
await operation.waitUntilFinished()
} else {
await operation.set(isExecuting: true)
}
if let data = await cache.data {
return data
}
let request = myRequest()
let response = await myService.send(request)
await cache.set(data: response)
await operation.set(isExecuting: false)
}
actor Cache {
var data: myResponse?
func set(data: myResponse?) {
self.data = data
}
}
actor OperationStatus {
@Published var isExecuting = false
private var cancellable = Set<AnyCancellable>()
func set(isExecuting: Bool) {
self.isExecuting = isExecuting
}
func waitUntilFinished() async {
guard isExecuting else { return }
return await withCheckedContinuation { continuation in
$isExecuting
.first { !$0 } // Wait until execution toggled off
.sink { _ in continuation.resume() }
.store(in: &cancellable)
}
}
}
// Do something
DispatchQueue.concurrentPerform(iterations: 1_000_000) { _ in execute() }
This ensures one request at a time, and subsequent calls are waiting until finished. It seems this works but wondering if there's a pure Concurrency way instead of mixing Combine
in, and how I can test this? Here's a test I started but I'm confused how to test this:
final class OperationStatusTests: XCTestCase {
private let iterations = 10_000 // 1_000_000
private let outerIterations = 10
actor Storage {
var counter: Int = 0
func increment() {
counter += 1
}
}
func testConcurrency() {
// Given
let storage = Storage()
let operation = OperationStatus()
let promise = expectation(description: "testConcurrency")
promise.expectedFulfillmentCount = outerIterations * iterations
@Sendable func execute() async {
guard await !operation.isExecuting else {
await operation.waitUntilFinished()
promise.fulfill()
return
}
await operation.set(isExecuting: true)
try? await Task.sleep(seconds: 8)
await storage.increment()
await operation.set(isExecuting: false)
promise.fulfill()
}
waitForExpectations(timeout: 10)
// When
DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in
(0..<iterations).forEach { _ in
Task { await execute() }
}
}
// Then
// XCTAssertEqual... how to test?
}
}
Upvotes: 2
Views: 1140
Reputation: 437552
Before I tackle a more general example, let us first dispense with some natural examples of sequential execution of asynchronous tasks, passing the result of one as a parameter of the next. Consider:
func entireProcess() async throws {
let value = try await first()
let value2 = try await subsequent(with: value)
let value3 = try await subsequent(with: value2)
let value4 = try await subsequent(with: value3)
// do something with `value4`
}
Or
func entireProcess() async throws {
var value = try await first()
for _ in 0 ..< 4 {
value = try await subsequent(with: value)
}
// do something with `value`
}
This is the easiest way to declare a series of async
functions, each of which takes the prior result as the input for the next iteration. So, let us expand the above to include some signposts for Instruments’ “Points of Interest” tool:
import os.log
private let log = OSLog(subsystem: "Test", category: .pointsOfInterest)
func entireProcess() async throws {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "start")
var value = try await first()
for _ in 0 ..< 4 {
os_signpost(.event, log: log, name: #function, "Scheduling: %d with input of %d", i, value)
value = try await subsequent(with: value)
}
os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
}
func first() async throws -> Int {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "start")
try await Task.sleep(seconds: 1)
let value = 42
os_signpost(.end, log: log, name: #function, signpostID: id, "%d", value)
return value
}
func subsequent(with value: Int) async throws -> Int {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: #function, signpostID: id, "%d", value)
try await Task.sleep(seconds: 1)
let newValue = value + 1
defer { os_signpost(.end, log: log, name: #function, signpostID: id, "%d", newValue) }
return newValue
}
So, there you see a series of requests that pass their result to the subsequent request. All of that os_signpost
signpost stuff is so we can visually see that they are running sequentially in Instrument’s “Points of Interest” tool:
You can see ⓢ
event signposts as each task is scheduled, and the intervals illustrate the sequential execution of these asynchronous tasks.
This is the easiest way to have dependencies between tasks, passing values from one task to another.
Now, that begs the question is how to generalize the above, where we await the prior task before starting the next one.
One pattern is to write an actor that awaits the result of the prior one. Consider:
actor SerialTasks<Success> {
private var previousTask: Task<Success, Error>?
func add(block: @Sendable @escaping () async throws -> Success) {
previousTask = Task { [previousTask] in
let _ = await previousTask?.result
return try await block()
}
}
}
Unlike the previous example, this does not require that you have a single function from which you initiate the subsequent tasks. E.g., I have used the above when some separate user interaction requires me to add a new task to the end of the list of previously submitted tasks.
There are two subtle, yet critical, aspects of the above actor:
The add
method, itself, must not be an asynchronous function. We need to avoid actor reentrancy. If this were an async
function (like in your example), we would lose the sequential execution of the tasks.
The Task
has a [previousTask]
capture list to capture a copy of the prior task. This way, each task will await
the prior one, avoiding any races.
The above can be used to make a series of tasks run sequentially. But it is not passing values between tasks, itself. I confess that I have used this pattern where I simply need sequential execution of largely independent tasks (e.g., sending separate commands being sent to some Process
). But it can probably be adapted for your scenario, in which you want to “share its response with [subsequent requests]”.
I would suggest that you post a separate question with MCVE with a practical example of precisely what you wanted to pass from one asynchronous function to another. I have, for example, done permutation of the above, passing integer from one task to another. But in practice, that is not of great utility, as it gets more complicated when you start dealing with the reality of heterogenous results parsing. In practice, the simple example with which I started this question is the most practical pattern.
On the broader question of working with/around actor reentrancy, I would advise keeping an eye out on SE-0306 - Future Directions which explicitly contemplates some potential elegant forthcoming alternatives. I would not be surprised to see some refinements, either in the language itself, or in the Swift Async Algorithms library.
tl;dr
I did not want to encumber the above with discussion regarding your code snippets, but there are quite a few issues. So, if you forgive me, here are some observations:
The attempt to use OperationStatus
to enforce sequential execution of async
calls will not work because actors feature reentrancy. If you have an async
function, every time you hit an await
, that is a suspension point at which point another call to that async function is allowed to proceed. The integrity of your OperationStatus
logic will be violated. You will not experience serial behavior.
If you are interesting in suspension points, I might recommend watching WWDC 2021 video Swift concurrency: Behind the scenes.
The testConcurrency
is calling waitForExpectations
before it actually starts any tasks that will fulfill any expectations. That will always timeout.
The testConcurrency
is using GCD concurrentPerform
, which, in turn, just schedules an asynchronous task and immediately returns. That defeats the entire purpose of concurrentPerform
(which is a throttling mechanism for running a series of synchronous tasks in parallel, but not exceed the maximum number of cores on your CPU). Besides, Swift concurrency features its own analog to concurrentPerform
, namely the constrained “cooperative thread pool” (also discussed in that video, IIRC), rendering concurrentPerform
obsolete in the world of Swift concurrency.
Bottom line, it doesn't make sense to include concurrentPerform
in a Swift concurrency codebase. It also does not make sense to use concurrentPerform
to launch asynchronous tasks (whether Swift concurrency or GCD). It is for launching a series of synchronous tasks in parallel.
In execute
in your test, you have two paths of execution, one which will await some state change and fulfills the expectation without ever incrementing the storage. That means that you will lose some attempts to increment the value. Your total will not match the desired resulting value. Now, if your intent was to drop requests if another was pending, that's fine. But I don't think that was your intent.
In answer to your question about how to test success at the end. You might do something like:
actor Storage {
private var counter: Int = 0
func increment() {
counter += 1
}
var value: Int { counter }
}
func testConcurrency() async {
let storage = Storage()
let operation = OperationStatus()
let promise = expectation(description: "testConcurrency")
let finalCount = outerIterations * iterations
promise.expectedFulfillmentCount = finalCount
@Sendable func execute() async {
guard await !operation.isExecuting else {
await operation.waitUntilFinished()
promise.fulfill()
return
}
await operation.set(isExecuting: true)
try? await Task.sleep(seconds: 1)
await storage.increment()
await operation.set(isExecuting: false)
promise.fulfill()
}
// waitForExpectations(timeout: 10) // this is not where you want to wait; moved below, after the tasks started
// DispatchQueue.concurrentPerform(iterations: outerIterations) { _ in // no point in this
for _ in 0 ..< outerIterations {
for _ in 0 ..< iterations {
Task { await execute() }
}
}
await waitForExpectations(timeout: 10)
// test the success to see if the store value was correct
let value = await storage.value // to test that you got the right count, fetch the value; note `await`, thus we need to make this an `async` test
// Then
XCTAssertEqual(finalCount, value, "Count")
}
Now, this test will fail for a variety of reasons, but hopefully this illustrates how you would verify the success or failure of the test. But, note, that this will test only that the final result was correct, not that they were executed sequentially. The fact that Storage
is an actor will hide the fact that they were not really invoked sequentially. I.e., if you really needed the results of one request to prepare the next is not tested here.
If, as you go through this, you want to really confirm the behavior of your OperationStatus
pattern, I would suggest using os_signpost
intervals (or simple logging statements where your tasks start and finish). You will see that the separate invocations of the asynchronous execute
method are not running sequentially.
Upvotes: 3