Reputation: 1934
With the following code, after I put receive(on: backgroundQueue)
, the receiveCompletion
will be called 100%, but the receiveValue
block is not.
xxxxPublisher
.xxxx()
.receive(on: backgroundQueue)
.xxxx()
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
// completion code
}, receiveValue: { value in
// receive value code
}).store(in: &cancellables)
This seems not a good behavior, are we supposed to not use receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil)
in this way? Am I missing something?
Here is the code to reproduce this bug, if you run this code, you will see that receiveCompletion
is called exactly 300 times, but receiveValue
is called less than 300.
import Foundation
import Combine
private var cancellables: Set<AnyCancellable> = []
let backgroundQueue = DispatchQueue.global(qos: .background)
for i in 1...300 {
runPublisher(i)
}
var sinkCompletedIndices = Set<Int>()
var sinkOutputIndices = Set<Int>()
func runPublisher(_ index: Int) {
[1].publisher
.receive(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
NSLog("sink receiveCompletion")
sinkCompletedIndices.insert(index)
}, receiveValue: { value in
NSLog("sink receiveValue")
sinkOutputIndices.insert(index)
}).store(in: &cancellables)
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
let diff = sinkCompletedIndices.filter { !sinkOutputIndices.contains($0) }
NSLog("Difference between completions and outputs \(diff)")
}
RunLoop.main.run()
Upvotes: 1
Views: 1968
Reputation: 14418
The problem is that Combine
expect schedulers to operate as serial queues, but DispatchQueue.global is concurrent.
If you declare you background queue like this:
let backgroundQueue = DispatchQueue(label: "any name will do", qos: .background)
Everything will work as expected.
There is an interesting discussion about it on Swift forums:
https://forums.swift.org/t/runloop-main-or-dispatchqueue-main-when-using-combine-scheduler/26635
Upvotes: 8