Mattie
Mattie

Reputation: 3028

Running a Combine chain in a thread-safe way

I have an existing flow implemented using Operation subclasses and an OperationQueue. I'm trying reimplement in Combine as a learning exercise.

The current version, simplified, looks like this:

func testFunction(completionHandler: (Result) -> Void) {
    let op = LongRunningOperation(object: self.stateObject)

    op.completionHandler = { [unowned op] in
        let result = op.result
        self.stateObject.update(with: result)

        completionHandler(result)
    }

    localSerialQueue.addOperation(op)
}

Some important aspects. LongRunningOperation is asynchronous. It may need to call out to the main queue to complete the login process, depending on stateObject. stateObject is only ever accessed on the localSerialQueue. Also, just to be really explicit, there could be other concurrent calls to runOperation, so the serialization provided by the queue+operation is essential.

The combination of mutable state and potential concurrent invocations is absolutely critical to the problem.

To illustrate the issue, I've produced an example which proves that Combine pipelines are not executed atomically. Any other result would have surprised me, but I just wanted to verify. Multiple threads can be executing various stages of the pipeline simultaneously, violating our precondition checks.

class MyThreadUnsafeObject {
    private var cancellables = Set<AnyCancellable>()
    private var inProcess = false

    private func testFunction() {
        Result<Bool, Error> {
            // begin our pipeline, checking our invariant
            // and mutating the state to start our process
            precondition(self.inProcess == false)

            self.inProcess = true

            // just pass some dummy data through
            return true
        }
        .publisher
        .flatMap { (result: Bool) -> AnyPublisher<Bool, Error> in
            Future { (promise) in
                precondition(self.inProcess)

                // simulate a very simple long-running task here,
                // which is not even asynchronous, just to make things
                // really clear.

                // critically, change some mutable state only
                // when done

                precondition(self.inProcess)
                sleep(2)
                precondition(self.inProcess)

                promise(.success(true))
            }.eraseToAnyPublisher()
        }
        .sink(receiveCompletion: { completion in
            if case .failure = completion {
                fatalError("this isn't allowed to happen")
            }
        }, receiveValue: { result in
            precondition(result == true)
            precondition(self.inProcess)

            // back to the initial state
            self.inProcess = false
        })
        .store(in: &self.cancellables)
    }

    func runTest() {
        let group = DispatchGroup()

        for _ in 0..<20 {
            DispatchQueue.global().async(group: group, qos: .default, flags: .inheritQoS) {
                self.testFunction()
            }
        }

        group.wait()
    }
}

Just to be extremely clear, my question is not about how to build a Combine pipeline. It is about the semantics of Combine's synchronization primitives, and how I can protect mutable state that is accesses within a pipeline. And, it could be the solution is to use Combine with another form of synchronization (like a lock) to handle this kind of access pattern. That's totally fine! I'm just curious if others have run into this kind of issue, if there is a more idiomatic way, or if I can reframe the problem somehow.

Upvotes: 2

Views: 3310

Answers (2)

Marin Todorov
Marin Todorov

Reputation: 6397

you can still use synchronization primitives that you know well, like dispatch queues and run loops, also with Combine code.

Combine (RxSwift and other async frameworks) use a scheduler to control the execution of each operator's code, i.e. running all the chained closures isn't free-for-all but there is a "controller" that schedules and controls the execution. You can set which scheduler you want to use by inserting an operator to set the scheduler - in that sense the model is exactly the same as using GDC, operations etc. - you'd like to do the heavy work on a background queue and perform the side-effects on the main queue to update your UI.

Luckily RunLoop and DispatchQueue are schedulers themselves — you can define concurrent or serial queues you'd like to use for the code execution and switch the pipeline to whichever you want to use. (see here: Conforms To)

Here's an example that starts any subscription on a concurrent background queue and updates the UI on the main thread, I think this should be enough to find your way in your code:

import Combine

let myBgQueue = DispatchQueue(
    label: "concurrent",
    qos: .default,
    attributes: .concurrent
)

let sub = (0...5).publisher
    .subscribe(on: myBgQueue)
    .flatMap({ el -> AnyPublisher<Int, Never> in
        Future { (promise) in
            precondition(!Thread.isMainThread)
            sleep(2)

            print("Future #\(el)")
            promise(.success(el))
        }.eraseToAnyPublisher()
    })
    .receive(on: DispatchQueue.main)
    .sink(receiveCompletion: { _ in
        print("Done.")
    }, receiveValue: { el in
        precondition(Thread.isMainThread)
        print("Got #\(el)")
    })

The console output is as follows (no precondition crashes which verifies the code runs on the correct queues):

Future #0
Got #0
Future #1
Got #1
Future #2
Got #2
Future #3
Got #3
Future #4
Got #4
Future #5
Got #5
Done.

Some time ago I write a blog post about receive(on:) and subscribe(on:) if you want to read more: https://trycombine.com/posts/subscribe-on-receive-on/

Upvotes: 3

matt
matt

Reputation: 535606

The whole point of Combine is that it lines up steps one after another. One step cannot execute until the previous step has signalled that it has executed (by passing a value down the pipeline). Thus merely using Combine makes things "atomic" in your sense; that is the entire point.

This has nothing to do with queues. You can specify a dispatch queue, or switch dispatch queues in the course of the pipeline, but that doesn't even really matter; switching queues is just another step on the way down the pipeline.

The code you have shown is pretty much nonsense and most of it is unnecessary. You don't call one publisher in the middle of another. You don't call store in the middle of a pipeline. You don't usually need to call subscribe. You don't call receive(on:) unless you are switching threads (dispatch queues actually).

So, you construct one pipeline. It starts with a publisher and after that there is a series of operators. Then, at the very end, there is one subscriber (sink or assign) and one store(in:) to keep it all alive, and that's it.

There are operators for describing every situation and topology you can think of. In particular (since this is the part you seem most puzzled about), there are various ways of handling asynchronicity in the middle of the pipeline, depending on what your goal is. Here are a few of them:

  • If, in the middle of the pipeline, you need another publisher to start only after the whole upstream pipeline has produced a value, you use the flatMap operator to bring that publisher into existence in response to receipt of that value.

  • Or, if the idea is that one publisher should start only after another has finished, but you do not need to pass anything from the first to the second, use the append operator.

  • Or, if it's okay for two publishers to operate independently but you cannot proceed until they have both published, use the zip operator.

And so on; there are many, many more like that.

Upvotes: 2

Related Questions