Andrii Moisol
Andrii Moisol

Reputation: 45

How make Combine's flatMap to complete overall stream?

I have some code like this

func a() -> AnyPublisher<Void, Never> {
    Future<Void, Never> { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            print(1)
            promise(.success(()))
        }
    }
    .eraseToAnyPublisher()
}

func b() -> AnyPublisher<Void, Never> {
    Future<Void, Never> { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            print(2)
            promise(.success(()))
        }
    }
    .eraseToAnyPublisher()
}

var tempBag = Set<AnyCancellable>()

let subject = CurrentValueSubject<Int, Never>(1)

subject
    .flatMap({ _ in a() })
    .flatMap({ _ in b() })
    .print()
    .sink(receiveCompletion: { _ in
        tempBag.removeAll()
    }, receiveValue: {  })
    .store(in: &tempBag)

So, I have some uncompletable subject in the root of the stream and some completable publishers in flatMap operator. I want the overall stream to complete when the last flatMap's publisher completes. So, I want the console to look like this:

receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())
receive finished

but actual result is

receive subscription: (FlatMap)
request unlimited
1
2
receive value: (())

How can I achieve this?

Upvotes: 1

Views: 1404

Answers (1)

Scott Thompson
Scott Thompson

Reputation: 23701

The problem you are having is that your Subject (the CurrentValueSubject) never finishes so the entire chain never completes. What you need is a publisher that emits a single value then completes at the top of your sequence, and an intermediate that waits until all the publishers it is tracking complete before finishing itself.

You already have a publisher that does one thing then completes... it's returned by a(). To wait until both a() and b() complete you can use combineLatest since a the publisher it creates won't finish until all the publishers it combines finish. The whole thing looks like:

a()
.combineLatest(b())
.print()
.sink(receiveCompletion: { _ in
    tempBag.removeAll()
}, receiveValue: { _ in () })
.store(in: &tempBag)

with the output

receive subscription: (CombineLatest)
request unlimited
1
2
receive value: (((), ()))
receive finished

Upvotes: 2

Related Questions