Reputation: 5700
I'm trying to take advantage of Combine's ability to subscribe
to an upstream Publisher on a different queue but I'm finding that when I do, the upstream Publisher does not correctly get cancelled.
The documentation for subscribe(on:options:) makes the following reference:
Using subscribe(on:options:) also causes the upstream publisher to perform cancel() using the specified scheduler.
It's not entirely clear to me what this statement implies. In the example code below, I have a basic type NameIterator
that conforms to Sequence
and IteratorProtocol
. It's just simulating a custom iterator that might take a long time to fetch elements.
Combine adds a .publisher
property to Sequence
. If you subscribe to that publisher in a single thread, then any cancellation events are correctly propagated up to the root publisher and iteration stops.
However, if you call subscribe(on:)
, then the cancellation event does not reach the root publisher until it has exhausted calls to the iterator. Note that sink
is correctly called in both cases, but the iterator continues to get called even though no values are being pushed down the pipeline.
This causes a problem if the iterator is expensive and contains a large number of items, like traversing a filesystem. Is there a way to correctly cancel the upstream publisher? Or perhaps I'm incorrectly configuring the chain of Publishers.
Sample Iterator:
struct NameIterator: Sequence, IteratorProtocol {
private let names = ["Alpha", "Beta", "Charlie", "Delta"]
private var index = 0
mutating func next() -> String? {
guard index < names.endIndex else {
return nil
}
// Simulate an expensive iterator.
sleep(1)
defer { index = names.index(after: index) }
print("NameIterator.next() called with \(names[index])")
return names[index]
}
}
Correctly Cancelled:
let cancellable = NameIterator().publisher
.first()
.sink { print("Sink: \($0)") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
Incorrectly Cancelled:
let cancellable = NameIterator().publisher
.subscribe(on: DispatchQueue(label: "BackgroundQueue"))
.first()
.sink { print("Sink: \($0)") }
// Outputs:
NameIterator.next() called with Alpha
NameIterator.next() called with Beta
Sink: Alpha
NameIterator.next() called with Charlie // This call was unexpected.
NameIterator.next() called with Delta // This call was unexpected.
Upvotes: 1
Views: 1015
Reputation: 534977
The quote from the documentation is not a statement about cancellation, it's a statement about threading. It says: if you subscribe on a certain queue, then that is the queue that will be used when/if the time comes for the cancellation message to be sent up the pipeline.
By choosing to subscribe on a specified queue, you are explicitly saying: when the time comes to cancel, queue that call. So, as with any action on a queue, we now have no idea when that will actually happen. The claim that "this call should not have happened" is thus wrong; there is no "should have" in the story. On the contrary, any expectation as to when cancellation would percolate up to the publisher is exactly what you gave up when you subscribed on the queue.
(Observe, by the way, that the Completion arrives down the pipeline in good order at the expected moment — that is, the sink gets the Alpha value followed immediately by the .finished
Completion. It is only the publisher that you have given all this extra leeway to.)
Upvotes: 1