Reputation: 785
I have this minimal example:
import UIKit
import Combine
var values = [1,2,3,4,5]
var cancel = values.publisher
.delay(for: 0.1, scheduler: DispatchQueue.global())
.print()
.flatMap() { i in
[i].publisher.first()
}
.sink { completion in
print("Received Completion: \(completion)")
} receiveValue: { v in
print("Received Value: \(v)")
}
My expectation is that the source publisher emits the values from 1 to 5 into the stream. Each number gets transformed into (just for the sake of it) a new publisher that emits exactly the first value and then completes. Since this is done with each number, I would expect that all values reach the sink. This is not the case, however. Output looks like this:
request unlimited
receive value: (1)
Received Value: 1
receive value: (2)
Received Value: 2
receive value: (4)
Received Value: 4
receive finished
Received Completion: finished
receive value: (3)
receive value: (5)
In fact, only 3 values reach the sink before the completion event arrives. Why is this? The documentation states:
successful completion of the new Publisher does not complete the overall stream.
Even more curious, when you replace .flatMap()
for .flatMap(maxPublishers: .max(1))
and add a .share()
to the original source publisher only the first value makes it to the sink.
Any pointers are much appreciated!
Upvotes: 1
Views: 858
Reputation: 385600
Your use of DispatchQueue.global()
is the problem. The values.publisher
sends all of its outputs, and its completion, downstream to the delay
operator as soon as values.publisher
receives the subscription. The delay
operator schedules six blocks (five for the output numbers and one for the completion) to run on DispatchQueue.global()
0.1 seconds later.
DispatchQueue.global()
is a concurrent queue. This means that it can run any number of blocks simultaneously. So there is no guarantee which of the six scheduled blocks will finish first.
It is in general a bad idea to use a concurrent queue as a Combine scheduler. You should use a serial queue instead. Here's a simple example:
var cancel = values.publisher
.delay(for: 0.1, scheduler: DispatchQueue(label: "my queue"))
.print()
.flatMap() { i in
[i].publisher.first()
}
.sink { completion in
print("Received Completion: \(completion)")
} receiveValue: { v in
print("Received Value: \(v)")
}
But you probably want to create the queue once and store it in a property, so you can use it with multiple publishers.
If you actually want the outputs to be delivered on the main queue (perhaps because you're going to use them to update a view), you should just use DispatchQueue.main
.
Upvotes: 8