Reputation: 216
I'm observing an unexpected behavior regarding CombineLatest, if the inner publishers has subscribe(on:)
, the CombineLatest stream is not emitting any value.
Notes:
func makePublisher() -> AnyPublisher<Int, Never> {
Deferred {
Future { promise in
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
promise(.success(Int.random(in: 0...3)))
}
}
}
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
Publishers.CombineLatest(
makePublisher(),
makePublisher()
)
.sink { completion in
print(completion)
} receiveValue: { (a, b) in
print(a, b)
}.store(in: &cancellables)
Is this a combine bug or expected behavior? Do you have any idea of how can be setup this kind of stream where the inners can define its own subscribe scheduler?
Upvotes: 3
Views: 748
Reputation: 543
do this whenever you don't see emission events.
extension Publisher {
// https://stackoverflow.com/questions/68415286/combinelatest-operator-is-not-emitting-when-inners-publishers-use-subscribeon
public func fixCombineLatestBug() -> AnyPublisher<Output, Failure> {
map({ CurrentValueSubject<Output, Failure>($0) })
.switchToLatest()
.eraseToAnyPublisher()
}
}
Publishers.CombineLatest(
makePublisher().fixCombineLatestBug(),
makePublisher().fixCombineLatestBug()
)
Upvotes: 0
Reputation: 385610
Yes, it's a bug. We can simplify the test case to this:
import Combine
import Dispatch
let pub = Just("x")
.subscribe(on: DispatchQueue.main)
let ticket = pub.combineLatest(pub)
.sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) })
This never prints anything. But if you comment out the subscribe(on:)
operator, it prints what's expected. If you leave subscribe(on:)
in, but insert some print()
operators, you'll see that the CombineLatest
operator never sends any demand upstream.
I suggest you copy the CombineX reimplementation of CombineLatest
and the utilities it needs to compile (the CombineX implementations of Lock
and LockedAtomic
, I think). I don't know that the CombineX version works either, but if it's buggy, at least you have the source and can try to fix it.
Upvotes: 5