CombineLatest operator is not emitting when inners publishers use subscribe(on:)

I'm observing an unexpected behavior regarding CombineLatest, if the inner publishers has subscribe(on:), the CombineLatest stream is not emitting any value.

Notes:

    func makePublisher() -> AnyPublisher<Int, Never> {
        Deferred {
            Future { promise in
                DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
                    promise(.success(Int.random(in: 0...3)))
                }
            }
        }
        .subscribe(on: DispatchQueue.global())
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
    }
    
    var cancellables = Set<AnyCancellable>()
    Publishers.CombineLatest(
        makePublisher(),
        makePublisher()
    )
    .sink { completion in
        print(completion)
    } receiveValue: { (a, b) in
        print(a, b)
    }.store(in: &cancellables)

Is this a combine bug or expected behavior? Do you have any idea of how can be setup this kind of stream where the inners can define its own subscribe scheduler?

Upvotes: 3

Views: 748

Answers (2)

ImpactZero
ImpactZero

Reputation: 543

do this whenever you don't see emission events.

extension Publisher {
    // https://stackoverflow.com/questions/68415286/combinelatest-operator-is-not-emitting-when-inners-publishers-use-subscribeon
    public func fixCombineLatestBug() -> AnyPublisher<Output, Failure> {
       map({ CurrentValueSubject<Output, Failure>($0) })
          .switchToLatest()
          .eraseToAnyPublisher()
    }
}

Publishers.CombineLatest(
   makePublisher().fixCombineLatestBug(),
   makePublisher().fixCombineLatestBug()
)

Upvotes: 0

rob mayoff
rob mayoff

Reputation: 385610

Yes, it's a bug. We can simplify the test case to this:

import Combine
import Dispatch

let pub = Just("x")
    .subscribe(on: DispatchQueue.main)

let ticket = pub.combineLatest(pub)
    .sink(
        receiveCompletion: { print($0) },
        receiveValue: { print($0) })

This never prints anything. But if you comment out the subscribe(on:) operator, it prints what's expected. If you leave subscribe(on:) in, but insert some print() operators, you'll see that the CombineLatest operator never sends any demand upstream.

I suggest you copy the CombineX reimplementation of CombineLatest and the utilities it needs to compile (the CombineX implementations of Lock and LockedAtomic, I think). I don't know that the CombineX version works either, but if it's buggy, at least you have the source and can try to fix it.

Upvotes: 5

Related Questions