Reputation: 45
I have some code like this
func a() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(1)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
func b() -> AnyPublisher<Void, Never> {
Future<Void, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
print(2)
promise(.success(()))
}
}
.eraseToAnyPublisher()
}
var tempBag = Set<AnyCancellable>()
let subject = CurrentValueSubject<Int, Never>(1)
subject
.flatMap({ _ in a() })
.flatMap({ _ in b() })
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { })
.store(in: &tempBag)
So, I have some uncompletable subject in the root of the stream and some completable publishers in flatMap operator. I want the overall stream to complete when the last flatMap's publisher completes. So, I want the console to look like this:
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
receive finished
but actual result is
receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
How can I achieve this?
Upvotes: 1
Views: 1404
Reputation: 23701
The problem you are having is that your Subject
(the CurrentValueSubject
) never finishes so the entire chain never completes. What you need is a publisher that emits a single value then completes at the top of your sequence, and an intermediate that waits until all the publishers it is tracking complete before finishing itself.
You already have a publisher that does one thing then completes... it's returned by a()
. To wait until both a()
and b()
complete you can use combineLatest
since a the publisher it creates won't finish until all the publishers it combines finish. The whole thing looks like:
a()
.combineLatest(b())
.print()
.sink(receiveCompletion: { _ in
tempBag.removeAll()
}, receiveValue: { _ in () })
.store(in: &tempBag)
with the output
receive subscription: (CombineLatest)
request unlimited
1
2
receive value: (((), ()))
receive finished
Upvotes: 2