Reputation: 3090
According to Apple, receive(on:options:)
runs callbacks on a given queue. We use a serial dispatch queue to prevent racing on localOptionalCancellable
in the code below. But receiveCancel
is not getting dispatched to that queue. Can someone tell me why?
From the documentation,
You use the receive(on:options:) operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop.
...
Prefer receive(on:options:) over explicit use of dispatch queues when performing work in subscribers. For example, instead of the following pattern:
import Foundation
import Combine
class Example {
private var localOptionalCancellable: AnyCancellable?
private let dispatchQueue = DispatchQueue(label: "LocalQueue-\(UUID())")
func misbehavingFunction() {
self.dispatchQueue.async {
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)
}
}
}
Example().misbehavingFunction()
Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
Previous access (a modification) started at (0x10eeaf12a).
Current access (a modification) started at:
0 libswiftCore.dylib 0x00007fff2ff7be50 swift_beginAccess + 568
3 Combine 0x00007fff4ba73a40 Publishers.HandleEvents.Inner.cancel() + 71
4 Combine 0x00007fff4ba74230 protocol witness for Cancellable.cancel() in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
5 Combine 0x00007fff4b9f10c0 Subscribers.Sink.cancel() + 652
6 Combine 0x00007fff4b9f1500 protocol witness for Cancellable.cancel() in conformance Subscribers.Sink<A, B> + 16
7 Combine 0x00007fff4b9dd2d0 AnyCancellable.cancel() + 339
8 Combine 0x00007fff4b9dd5f0 AnyCancellable.__deallocating_deinit + 9
9 libswiftCore.dylib 0x00007fff2ff7da20 _swift_release_dealloc + 16
13 Combine 0x00007fff4b9f0da0 Subscribers.Sink.receive(_:) + 54
14 Combine 0x00007fff4b9f14c0 protocol witness for Subscriber.receive(_:) in conformance Subscribers.Sink<A, B> + 16
15 Combine 0x00007fff4ba73ed0 Publishers.HandleEvents.Inner.receive(_:) + 129
16 Combine 0x00007fff4ba74170 protocol witness for Subscriber.receive(_:) in conformance Publishers.HandleEvents<A>.Inner<A1> + 16
17 Combine 0x00007fff4ba26440 closure #1 in Publishers.ReceiveOn.Inner.receive(_:) + 167
18 libswiftDispatch.dylib 0x000000010e97cad0 thunk for @escaping @callee_guaranteed () -> () + 14
19 libdispatch.dylib 0x00007fff20105323 _dispatch_call_block_and_release + 12
20 libdispatch.dylib 0x00007fff20106500 _dispatch_client_callout + 8
21 libdispatch.dylib 0x00007fff2010c12e _dispatch_lane_serial_drain + 715
22 libdispatch.dylib 0x00007fff2010cde1 _dispatch_lane_invoke + 403
23 libdispatch.dylib 0x00007fff20117269 _dispatch_workloop_worker_thread + 782
24 libsystem_pthread.dylib 0x00007fff6116391b _pthread_wqthread + 290
25 libsystem_pthread.dylib 0x00007fff61162b68 start_wqthread + 15
Fatal access conflict detected.
Upvotes: 0
Views: 1259
Reputation: 385890
According to Apple,
receive(on:options:)
runs callbacks on a given queue.
Not exactly. Here's what the documentation actually says:
You use the
receive(on:options:)
operator to receive results and completion on a specific scheduler, such as performing UI work on the main run loop. In contrast withsubscribe(on:options:)
, which affects upstream messages,receive(on:options:)
changes the execution context of downstream messages.
(Emphasis added.) So receive(on:)
controls the Scheduler
used to call a Subscriber
's receive(_:)
and receive(completion:)
methods. It does not control the Scheduler
used to call the Subscription
's request(_:)
or cancel()
methods.
To control the Scheduler
used to call the Subscription
's cancel()
method, you need to use the subscribe(on:options:)
operator downstream of the handleEvents
operator, like this:
self.localOptionalCancellable = Just(())
.setFailureType(to: Error.self)
.receive(on: self.dispatchQueue)
.handleEvents(
receiveCancel: {
// Simultaneous accesses to 0x600000364e10, but modification requires exclusive access.
// Can be fixed by wrapping in self.dispatchQueue.async {}
self.localOptionalCancellable = nil
}
)
.subscribe(on: self.dispatchQueue)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.sink(
receiveCompletion: { _ in },
receiveValue: { _ in
self.localOptionalCancellable = nil
}
)
Upvotes: 1